Skip to main content

lib/rulestead/store/ecto.ex

# credo:disable-for-this-file
defmodule Rulestead.Store.Ecto do
  import Ecto.Query

  alias Ecto.Changeset
  alias Ecto.ConstraintError
  alias Ecto.Multi

  alias Rulestead.{
    Admin.Redaction,
    Admin.Lifecycle,
    Audience,
    Governance.Approval,
    Governance.AudienceMutationChangeRequest,
    Governance.BlastRadiusThreshold,
    Governance.ChangeRequest,
    Governance.ExecutionAttempt,
    Governance.RolloutAutoAdvance,
    Governance.RolloutAutoAdvance.Schedule,
    Governance.ScheduledExecution,
    AuditEvent,
    CodeRefs.CodeReference,
    CodeRefs.ScanReceipt,
    Context,
    Environment,
    EnvironmentVersion,
    Flag,
    FlagEnvironment,
    GuardrailDecision,
    Guardrails.AutoAdvance,
    Guardrails.Decision,
    Guardrails.SignalFact,
    Manifest.Import,
    Oban,
    Promotion.Apply,
    Promotion.Compare,
    Repo,
    RolloutAutoAdvancePolicy,
    RuntimeSnapshot,
    Ruleset,
    RulesetError,
    Store,
    StoreError,
    Targeting.AudienceDependencies,
    Targeting.AudienceReferenceProjection,
    Targeting.DependencyInventory,
    Targeting.DependencyValidator,
    Targeting.ImpactPreview,
    Targeting.PreviewEvidence,
    Targeting.PreviewEvidence.Limits,
    Telemetry,
    Webhooks.Destination,
    Webhooks.Delivery
  }

  alias Rulestead.Runtime.{Config, Notifier}
  alias Rulestead.Store.Command

  @snapshot_schema_version 1
  @scheduled_execution_retry_limit 3

  @behaviour Store

  @impl Store
  def fetch_flag(%Command.FetchFlag{} = command) do
    with {:ok, environment} <- fetch_environment(command.environment_key),
         {:ok, flag, flag_environment} <-
           fetch_flag_environment(command.flag_key, environment.key) do
      lifecycle_context = lifecycle_context_for_flags([flag.key])

      {:ok,
       build_flag_detail_payload(
         flag,
         environment,
         flag_environment,
         command.include_ruleset?,
         lifecycle_context
       )}
    end
  end

  @impl Store
  def compare_environments(%Command.CompareEnvironments{} = command) do
    with {:ok, source_environment} <- fetch_environment(command.source_environment_key),
         {:ok, target_environment} <- fetch_environment(command.target_environment_key) do
      flags =
        compare_flags_query(
          source_environment.key,
          target_environment.key,
          command.flag_keys
        )
        |> Repo.all()

      audiences =
        from(audience in Audience)
        |> Repo.all()
        |> Map.new(&{&1.key, audience_summary(&1)})

      {:ok,
       Compare.compare_projected(%{
         source_environment: environment_summary(source_environment),
         target_environment: environment_summary(target_environment),
         requested_flag_keys: command.flag_keys,
         compare_token: command.compare_token,
         tenant_key: command.tenant_key,
         source_flags: compare_payloads(flags, source_environment),
         target_flags: compare_payloads(flags, target_environment),
         audiences: audiences
       })}
    end
  end

  @impl Store
  def apply_promotion(%Command.ApplyPromotion{} = command) do
    run_promotion_apply(command, allow_protected_target?: false)
  rescue
    error in [ConstraintError] ->
      {:error, StoreError.unavailable(cause: error)}
  end

  @impl Store
  def preview_manifest_import(%Command.PreviewManifestImport{} = command) do
    with {:ok, target_manifest} <- Rulestead.export_manifest(command.target_environment_key) do
      {:ok,
       Import.preview(command.manifest, target_manifest,
         target_environment_key: command.target_environment_key
       )}
    end
  end

  @impl Store
  def apply_manifest_import(%Command.ApplyManifestImport{} = command) do
    run_manifest_import_apply(command, allow_protected_target?: false)
  rescue
    error in [ConstraintError] ->
      {:error, StoreError.unavailable(cause: error)}
  end

  @doc false
  @spec rebuild_audience_reference_projection() ::
          {:ok, %{deleted_rows: non_neg_integer(), inserted_rows: non_neg_integer()}}
          | {:error, Rulestead.Error.t()}
  def rebuild_audience_reference_projection do
    case refresh_audience_reference_projection(nil) do
      {:ok, result} -> {:ok, result}
      {:error, error} -> {:error, error}
    end
  end

  @impl Store
  @spec list_audience_dependencies(Command.ListAudienceDependencies.t()) ::
          {:ok,
           %{
             entries: [DependencyInventory.entry()],
             limit: pos_integer(),
             offset: non_neg_integer(),
             returned: non_neg_integer(),
             total_count: non_neg_integer()
           }}
          | {:error, Rulestead.Error.t()}
  def list_audience_dependencies(%Command.ListAudienceDependencies{} = command) do
    limit = command_limit(command)
    offset = command_offset(command)

    all_entries =
      AudienceReferenceProjection
      |> maybe_filter_projection_scope(command)
      |> Repo.all()
      |> Enum.map(&projection_row_to_entry/1)
      |> DependencyInventory.sort_entries()

    page_entries = all_entries |> Enum.drop(offset) |> Enum.take(limit)
    redacted = Redaction.redact_dependency_inventory(page_entries, redaction_options(command))

    {:ok,
     %{
       entries: redacted.entries,
       reference_count: redacted.reference_count,
       hidden_reference_count: redacted.hidden_reference_count,
       redacted: redacted.redacted,
       redacted_entries: redacted.redacted_entries,
       limit: limit,
       offset: offset,
       returned: length(redacted.entries),
       total_count: length(all_entries)
     }}
  rescue
    error in [ConstraintError] ->
      {:error, StoreError.unavailable(cause: error)}
  end

  def list_audience_dependencies(command) when is_map(command) do
    command
    |> list_audience_dependencies_command()
    |> list_audience_dependencies()
  end

  defp run_promotion_apply(%Command.ApplyPromotion{} = command, opts) do
    allow_protected_target? = Keyword.get(opts, :allow_protected_target?, false)

    with {:ok, _source_environment} <- fetch_environment(command.source_environment_key),
         {:ok, target_environment} <- fetch_environment(command.target_environment_key),
         :ok <-
           ensure_promotion_target_allowed(
             target_environment.key,
             allow_protected_target?
           ),
         # Fail closed for direct apply and replay/re-apply paths.
         :ok <-
           Apply.validate_live_dependencies(
             command,
             dependency_validation_audiences(),
             message: "promotion apply blocked by dependency validation"
           ),
         {:ok, compare} <-
           compare_environments(
             Command.CompareEnvironments.new(
               command.source_environment_key,
               command.target_environment_key,
               flag_keys: command.flag_keys,
               compare_token: command.compare_token,
               tenant_key: command.tenant_key
             )
           ),
         :ok <-
           Apply.validate_with_compare(command, compare,
             allow_protected_target?: allow_protected_target?
           ) do
      published_at = now()

      Multi.new()
      |> Multi.run(:applied_flags, fn repo, _changes ->
        apply_promotion_bundle(repo, target_environment, command, published_at)
      end)
      |> Multi.run(:environment_version, fn repo, %{applied_flags: applied_flags} ->
        insert_environment_version(repo, target_environment, command, applied_flags)
      end)
      |> Multi.run(:runtime_snapshot, fn repo, _changes ->
        insert_runtime_snapshot(repo, target_environment, published_at)
      end)
      |> Repo.transact()
      |> case do
        {:ok, %{environment_version: environment_version, runtime_snapshot: runtime_snapshot}} ->
          _ = refresh_audience_reference_projection(target_environment.key)

          {:ok,
           %{
             source_environment_key: command.source_environment_key,
             target_environment_key: command.target_environment_key,
             compare_token: command.compare_token,
             compare_schema_version: command.compare_schema_version,
             applied_flag_keys: command.flag_keys,
             dependency_closure_keys: command.dependency_closure_keys,
             environment_version_id: environment_version.id,
             environment_version_version: environment_version.version,
             snapshot_version: runtime_snapshot.version
           }}

        {:error, _operation, %Rulestead.Error{} = error, _changes} ->
          {:error, error}

        {:error, _operation, %Changeset{} = changeset, _changes} ->
          {:error,
           StoreError.invalid_command("promotion apply could not be persisted", cause: changeset)}

        {:error, _operation, reason, _changes} ->
          {:error, StoreError.unavailable(cause: reason)}
      end
    else
      {:error, error} ->
        {:error, error}
    end
  end

  defp run_manifest_import_apply(%Command.ApplyManifestImport{} = command, opts) do
    with {:ok, target_environment} <- fetch_environment(command.target_environment_key),
         :ok <-
           ensure_promotion_target_allowed(
             target_environment.key,
             Keyword.get(opts, :allow_protected_target?, false)
           ) do
      published_at = now()

      Multi.new()
      |> Multi.run(:applied_flags, fn repo, _changes ->
        apply_manifest_import_bundle(repo, target_environment, command, published_at)
      end)
      |> Multi.run(:environment_version, fn repo, %{applied_flags: applied_flags} ->
        insert_manifest_import_environment_version(
          repo,
          target_environment,
          command,
          applied_flags,
          published_at
        )
      end)
      |> Multi.run(:runtime_snapshot, fn repo, _changes ->
        insert_runtime_snapshot(repo, target_environment, published_at)
      end)
      |> Repo.transact()
      |> case do
        {:ok, %{environment_version: environment_version, runtime_snapshot: runtime_snapshot}} ->
          _ = refresh_audience_reference_projection(target_environment.key)

          {:ok,
           %{
             source_environment_key: command.source_environment_key,
             target_environment_key: command.target_environment_key,
             plan_token: command.plan_token,
             applied_flag_keys: command.flag_keys,
             dependency_closure_keys: command.dependency_closure_keys,
             environment_version_id: environment_version.id,
             environment_version_version: environment_version.version,
             snapshot_version: runtime_snapshot.version
           }}

        {:error, _operation, %Rulestead.Error{} = error, _changes} ->
          {:error, error}

        {:error, _operation, %Changeset{} = changeset, _changes} ->
          {:error,
           StoreError.invalid_command("manifest import could not be persisted", cause: changeset)}

        {:error, _operation, reason, _changes} ->
          {:error, StoreError.unavailable(cause: reason)}
      end
    end
  end

  @impl Store
  def fetch_snapshot(%Command.FetchSnapshot{} = command) do
    with {:ok, environment} <- fetch_environment(command.environment_key) do
      command
      |> runtime_snapshot_query(environment.key)
      |> Repo.one()
      |> case do
        nil ->
          {:error,
           StoreError.snapshot_not_found(
             environment.key,
             metadata: snapshot_lookup_metadata(environment.key, command.version)
           )}

        snapshot ->
          {:ok, serialize_runtime_snapshot(snapshot)}
      end
    end
  end

  @impl Store
  def create_flag(%Command.CreateFlag{} = command) do
    with {:ok, environments} <- create_environments(command.environment_keys) do
      attrs = %{
        key: to_string(command.key),
        description: command.description,
        flag_type: command.flag_type,
        value_type: command.value_type,
        default_value: command.default_value,
        ownership: command.ownership,
        lifecycle: command.lifecycle,
        tags: command.tags
      }

      Multi.new()
      |> Multi.insert(:flag, Flag.changeset(%Flag{}, attrs))
      |> Multi.run(:flag_environments, fn repo, %{flag: flag} ->
        insert_flag_environments(repo, flag, environments)
      end)
      |> Repo.transact()
      |> case do
        {:ok, %{flag: flag}} ->
          flag = flag_by_key_query(flag.key) |> Repo.one()
          {:ok, build_create_payload(flag)}

        {:error, :flag, %Changeset{} = changeset, _changes} ->
          {:error, store_changeset_error(changeset, command.key, :all)}

        {:error, :flag_environments, %Rulestead.Error{} = error, _changes} ->
          {:error, error}

        {:error, _operation, reason, _changes} ->
          {:error, StoreError.unavailable(cause: reason)}
      end
    end
  rescue
    error in [ConstraintError] ->
      {:error, StoreError.unavailable(cause: error)}
  end

  @impl Store
  def update_flag(%Command.UpdateFlag{} = command) do
    case flag_by_key_query(command.flag_key) |> Repo.one() do
      nil ->
        {:error, StoreError.flag_not_found(command.flag_key, :all)}

      flag ->
        with :ok <- ensure_not_archived(command.flag_key, flag) do
          attrs =
            %{}
            |> maybe_put_update_field(:description, command.description)
            |> maybe_put_update_field(:ownership, command.ownership)
            |> maybe_put_update_field(:tags, command.tags)
            |> maybe_put_lifecycle_update(command)

          flag
          |> Flag.changeset(attrs)
          |> Repo.update()
          |> case do
            {:ok, updated_flag} ->
              updated_flag = flag_by_key_query(updated_flag.key) |> Repo.one()
              {:ok, build_update_payload(updated_flag, flag.ownership.owner_ref)}

            {:error, %Changeset{} = changeset} ->
              {:error, store_changeset_error(changeset, command.flag_key, :all)}
          end
        end
    end
  rescue
    error in [ConstraintError] ->
      {:error, StoreError.unavailable(cause: error)}
  end

  @impl Store
  def save_draft_ruleset(%Command.SaveDraftRuleset{} = command) do
    case audit_result(command) do
      :denied ->
        with {:ok, audit_event} <-
               insert_audit_only_event(command, audit_event_type(command), :denied) do
          {:ok, %{audit_event: AuditEvent.serialize(audit_event)}}
        end

      _other ->
        with {:ok, environment} <- fetch_environment(command.environment_key),
             {:ok, flag, flag_environment} <-
               fetch_flag_environment(command.flag_key, environment.key),
             :ok <- ensure_not_archived(command.flag_key, flag) do
          attrs = %{
            flag_environment_id: flag_environment.id,
            version: next_ruleset_version(flag_environment.id),
            status: :draft,
            salt: Map.get(command.ruleset, :salt) || Map.get(command.ruleset, "salt"),
            published_at: nil,
            metadata:
              Map.get(command.ruleset, :metadata) || Map.get(command.ruleset, "metadata") || %{},
            rules: Map.get(command.ruleset, :rules) || Map.get(command.ruleset, "rules") || []
          }

          %Ruleset{}
          |> Ruleset.changeset(attrs)
          |> Repo.insert()
          |> case do
            {:ok, ruleset} ->
              {:ok, %{version: ruleset.version, ruleset: serialize_ruleset(ruleset)}}

            {:error, %Changeset{} = changeset} ->
              {:error, ruleset_error(changeset, command.flag_key, command.environment_key)}
          end
        end
    end
  rescue
    error in [ConstraintError] ->
      {:error, StoreError.unavailable(cause: error)}
  end

  @impl Store
  def publish_ruleset(%Command.PublishRuleset{} = command) do
    case audit_result(command) do
      :denied ->
        with {:ok, audit_event} <-
               insert_audit_only_event(command, audit_event_type(command), :denied) do
          {:ok, %{audit_event: AuditEvent.serialize(audit_event)}}
        end

      _other ->
        with {:ok, environment} <- fetch_environment(command.environment_key),
             {:ok, flag, flag_environment} <-
               fetch_flag_environment(command.flag_key, environment.key),
             :ok <- ensure_not_archived(command.flag_key, flag),
             {:ok, ruleset} <-
               resolve_publishable_ruleset(flag_environment, environment.key, command.version) do
          dependency_entries =
            publish_dependency_entries(environment.key, command, flag, ruleset)

          dependency_findings = validate_dependency_entries(command, dependency_entries)

          if DependencyValidator.blockers?(dependency_findings) do
            error =
              DependencyValidator.to_error(dependency_findings,
                message: "ruleset publish blocked by dependency validation"
              )

            _ = insert_blocked_publish_event(command, ruleset, dependency_findings)

            {:error, error}
          else
            published_at = now()
            previous_ruleset = active_ruleset(flag_environment)

            Multi.new()
            |> Multi.update(
              :ruleset,
              Ruleset.changeset(ruleset, %{status: :published, published_at: published_at})
            )
            |> Multi.update(
              :flag_environment,
              FlagEnvironment.changeset(flag_environment, %{
                active_ruleset_id: ruleset.id,
                status: :active,
                last_published_at: published_at
              })
            )
            |> Multi.update(:flag, Changeset.change(flag, updated_at: published_at))
            |> Multi.run(:runtime_snapshot, fn repo, _changes ->
              insert_runtime_snapshot(repo, environment, published_at)
            end)
            |> audit_multi(:audit_event, command, ruleset, environment, previous_ruleset)
            |> Repo.transact()
            |> case do
              {:ok, _changes} ->
                _ = refresh_audience_reference_projection(command.environment_key)
                fetch_flag(Command.FetchFlag.new(command.flag_key, command.environment_key))

              {:error, :ruleset, %Changeset{} = changeset, _changes} ->
                {:error, ruleset_error(changeset, command.flag_key, command.environment_key)}

              {:error, :flag_environment, %Changeset{} = changeset, _changes} ->
                {:error,
                 store_changeset_error(changeset, command.flag_key, command.environment_key)}

              {:error, _operation, reason, _changes} ->
                {:error, StoreError.unavailable(cause: reason)}
            end
          end
        end
    end
  rescue
    error in [ConstraintError] ->
      {:error, StoreError.unavailable(cause: error)}
  end

  @impl Store
  def archive_flag(%Command.ArchiveFlag{} = command) do
    case audit_result(command) do
      :denied ->
        with {:ok, audit_event} <-
               insert_audit_only_event(command, audit_event_type(command), :denied) do
          {:ok, %{audit_event: AuditEvent.serialize(audit_event)}}
        end

      _other ->
        case flag_by_key_query(command.flag_key) |> Repo.one() do
          nil ->
            {:error, StoreError.flag_not_found(command.flag_key, :all)}

          flag ->
            archived_at = flag.archived_at || now()

            Multi.new()
            |> Multi.update(:flag, Flag.changeset(flag, %{archived_at: archived_at}))
            |> Multi.update_all(
              :flag_environments,
              from(fe in FlagEnvironment, where: fe.flag_id == ^flag.id),
              set: [status: :archived, updated_at: archived_at]
            )
            |> audit_multi(:audit_event, command, nil, nil, nil)
            |> Repo.transact()
            |> case do
              {:ok, _changes} ->
                archived_flag = flag_by_key_query(command.flag_key) |> Repo.one()

                Enum.each(
                  archived_flag.flag_environments,
                  &insert_runtime_snapshot(Repo, &1.environment, archived_at)
                )

                {:ok, build_archive_payload(archived_flag)}

              {:error, :flag, %Changeset{} = changeset, _changes} ->
                {:error, store_changeset_error(changeset, command.flag_key, :all)}

              {:error, _operation, reason, _changes} ->
                {:error, StoreError.unavailable(cause: reason)}
            end
        end
    end
  end

  @impl Store
  def list_flags(%Command.ListFlags{} = command) do
    with {:ok, environment_filter} <- list_environment_filter(command.environment_key) do
      raw_entries =
        from(flag in Flag,
          join: fe in FlagEnvironment,
          on: fe.flag_id == flag.id,
          join: env in Environment,
          on: env.id == fe.environment_id,
          preload: [flag_environments: {fe, [:environment, :active_ruleset]}]
        )
        |> maybe_filter_environment(environment_filter)
        |> maybe_filter_archived(command.include_archived?)
        |> Repo.all()
        |> Enum.flat_map(fn flag ->
          Enum.map(flag.flag_environments, fn flag_environment ->
            %{
              flag: flag,
              environment: flag_environment.environment,
              flag_environment: flag_environment
            }
          end)
        end)

      lifecycle_context = lifecycle_context_for_flags(Enum.map(raw_entries, & &1.flag.key))

      entries =
        raw_entries
        |> Enum.map(&build_list_entry(&1, lifecycle_context))
        |> maybe_filter_query(command.query)
        |> maybe_filter_owner(command.owner)
        |> maybe_filter_tags(command.tags)
        |> maybe_filter_lifecycle(command.lifecycle)
        |> maybe_filter_stale(command.stale)
        |> maybe_filter_readiness(command.readiness)
        |> maybe_filter_evidence_quality(command.evidence_quality)
        |> maybe_filter_flag_type(command.flag_type)
        |> sort_entries(command.sort)

      {:ok, paginate_entries(entries, command)}
    end
  end

  defp maybe_filter_flag_type(entries, nil), do: entries

  defp maybe_filter_flag_type(entries, flag_type) do
    Enum.filter(entries, fn entry ->
      entry.flag.flag_type == flag_type
    end)
  end

  @impl Store
  def list_environments(%Command.ListEnvironments{} = command) do
    environments =
      Environment
      |> maybe_filter_environment_query(command.query)
      |> order_by([environment], asc: environment.key)
      |> limit(^command.limit)
      |> Repo.all()
      |> Enum.map(&environment_summary/1)

    {:ok, environments}
  end

  @impl Store
  def list_audiences(%Command.ListAudiences{} = command) do
    audiences =
      Audience
      |> maybe_filter_archived_audiences(command.include_archived?)
      |> maybe_filter_audience_query(command.query)
      |> order_by([audience], asc: audience.key)
      |> limit(^command.limit)
      |> Repo.all()
      |> Enum.map(&audience_summary/1)

    {:ok, audiences}
  end

  @impl Store
  def preview_audience_impact(%Command.PreviewAudienceImpact{} = command) do
    with {:ok, environment} <- fetch_environment(command.environment_key || "test"),
         {:ok, audience} <- fetch_audience_for_mutation(command.audience_key),
         :ok <- ensure_audience_active(audience) do
      audience_preview_payload(Repo, environment.key, audience, command)
    end
  end

  @impl Store
  def apply_audience_mutation(%Command.ApplyAudienceMutation{} = command) do
    if audit_result(command) == :denied do
      insert_audience_audit_only_event(command, "audience.mutation_blocked", :denied)
    else
      apply_confirmed_audience_mutation(command)
    end
  rescue
    error in [ConstraintError] ->
      {:error, StoreError.unavailable(cause: error)}
  end

  defp apply_confirmed_audience_mutation(%Command.ApplyAudienceMutation{} = command, opts \\ []) do
    governed_apply? = Keyword.get(opts, :governed_apply?, false)
    published_at = now()

    Multi.new()
    |> Multi.run(:environment, fn _repo, _changes ->
      fetch_environment(command.environment_key)
    end)
    |> Multi.run(:audience, fn _repo, _changes ->
      fetch_audience_for_mutation(command.audience_key)
    end)
    |> Multi.run(:preview, fn repo, %{environment: environment, audience: audience} ->
      with :ok <- ensure_audience_active(audience),
           :ok <- ensure_supported_audience_operation(command),
           :ok <- ensure_audience_preview_schema(command),
           {:ok, preview} <- audience_preview_payload(repo, environment.key, audience, command),
           :ok <- ensure_fresh_audience_preview(command, preview),
           :ok <-
             validate_blast_radius_threshold(command, preview, governed_apply?: governed_apply?) do
        {:ok, preview}
      end
    end)
    |> Multi.run(:dependency_validation, fn _repo, %{preview: preview} ->
      dependency_entries = audience_dependency_entries(preview, command)

      dependency_findings =
        validate_dependency_entries(command, dependency_entries,
          expected_reference_keys: command.affected_reference_keys
        )

      if DependencyValidator.blockers?(dependency_findings) do
        {:error,
         DependencyValidator.to_error(dependency_findings,
           message: "audience mutation blocked by dependency validation"
         )}
      else
        {:ok, dependency_findings}
      end
    end)
    |> Multi.run(:audience_mutation, fn repo, %{audience: audience} ->
      apply_audience_operation(repo, audience, command, published_at)
    end)
    |> Multi.run(:runtime_snapshot, fn repo, %{environment: environment} ->
      insert_runtime_snapshot(repo, environment, published_at)
    end)
    |> Multi.run(:audit_event, fn repo,
                                  %{
                                    audience: audience,
                                    audience_mutation: updated_audience,
                                    environment: environment,
                                    preview: preview
                                  } ->
      repo.insert(
        audience_audit_event_changeset(
          %AuditEvent{},
          command,
          audience_event_type(command.operation),
          :ok,
          %{
            environment_key: environment.key,
            before: audience_audit_state(audience),
            after: audience_audit_state(updated_audience),
            preview: preview,
            dependency_findings: []
          }
        )
      )
    end)
    |> Repo.transact()
    |> case do
      {:ok,
       %{
         audience_mutation: audience,
         preview: preview,
         runtime_snapshot: snapshot,
         audit_event: audit_event
       }} ->
        _ = refresh_audience_reference_projection(command.environment_key)

        {:ok,
         %{
           result: :ok,
           operation: command.operation,
           audience: audience_summary(audience),
           preview: preview,
           snapshot_version: snapshot.version,
           audit_event: AuditEvent.serialize(audit_event)
         }}

      {:error, _operation, %Rulestead.Error{} = error, changes} ->
        if Keyword.get(opts, :audit_blocked?, true) do
          _ = insert_blocked_audience_event(command, error, preview: Map.get(changes, :preview))
        end

        {:error, error}

      {:error, _operation, %Changeset{} = changeset, changes} ->
        error =
          StoreError.invalid_command("audience mutation could not be persisted", cause: changeset)

        if Keyword.get(opts, :audit_blocked?, true) do
          _ = insert_blocked_audience_event(command, error, preview: Map.get(changes, :preview))
        end

        {:error, error}

      {:error, _operation, reason, changes} ->
        error = StoreError.unavailable(cause: reason)

        if Keyword.get(opts, :audit_blocked?, true) do
          _ = insert_blocked_audience_event(command, error, preview: Map.get(changes, :preview))
        end

        {:error, error}
    end
  end

  @impl Store
  def record_evaluation(%Command.RecordEvaluation{} = command) do
    with {:ok, environment} <- fetch_environment(command.environment_key),
         {:ok, _flag, flag_environment} <-
           fetch_flag_environment(command.flag_key, environment.key) do
      timestamp = DateTime.truncate(command.last_evaluated_at, :microsecond)

      next_timestamp =
        case flag_environment.last_evaluated_at do
          %DateTime{} = existing ->
            case DateTime.compare(existing, timestamp) do
              :gt -> existing
              _ -> timestamp
            end

          _ ->
            timestamp
        end

      flag_environment
      |> FlagEnvironment.changeset(%{last_evaluated_at: next_timestamp})
      |> Repo.update()
      |> case do
        {:ok, updated} ->
          {:ok,
           %{
             flag_key: to_string(command.flag_key),
             environment_key: environment.key,
             last_evaluated_at: updated.last_evaluated_at
           }}

        {:error, %Changeset{} = changeset} ->
          {:error, store_changeset_error(changeset, command.flag_key, command.environment_key)}
      end
    end
  end

  @impl Store
  def advance_rollout(%Command.AdvanceRollout{} = command) do
    with {:ok, environment} <- fetch_environment(command.environment_key),
         {:ok, flag, flag_environment} <-
           fetch_flag_environment(command.flag_key, environment.key),
         :ok <- ensure_not_archived(command.flag_key, flag),
         {:ok, active_ruleset} <- ensure_active_ruleset(flag_environment, command),
         {:ok, rollout_rule} <- resolve_rollout_rule(active_ruleset, command.rule_key),
         {:ok, percentage} <- ensure_rollout_percentage(command.percentage),
         {:ok, next_ruleset_attrs} <-
           advanced_ruleset_attrs(active_ruleset, rollout_rule.key, percentage) do
      published_at = now()
      previous_ruleset = active_ruleset

      Multi.new()
      |> Multi.insert(
        :ruleset,
        Ruleset.changeset(%Ruleset{}, %{
          flag_environment_id: flag_environment.id,
          version: next_ruleset_version(flag_environment.id),
          status: :published,
          salt: next_ruleset_attrs.salt,
          published_at: published_at,
          metadata: next_ruleset_attrs.metadata,
          rules: next_ruleset_attrs.rules
        })
      )
      |> Multi.run(:flag_environment, fn repo, %{ruleset: ruleset} ->
        flag_environment
        |> FlagEnvironment.changeset(%{
          active_ruleset_id: ruleset.id,
          status: :active,
          last_published_at: published_at
        })
        |> repo.update()
      end)
      |> Multi.update(:flag, Changeset.change(flag, updated_at: published_at))
      |> Multi.run(:runtime_snapshot, fn repo, _changes ->
        insert_runtime_snapshot(repo, environment, published_at)
      end)
      |> Multi.run(:decision, fn repo, %{ruleset: ruleset} ->
        repo.insert(
          GuardrailDecision.changeset(
            %GuardrailDecision{},
            advance_decision_attrs(command, ruleset, rollout_rule, published_at)
          )
        )
      end)
      |> Multi.run(:audit_event, fn repo, %{ruleset: ruleset, decision: decision} ->
        repo.insert(
          audit_event_changeset(%AuditEvent{}, command, "rollout.advance", :ok, %{
            environment_key: environment.key,
            before: ruleset_audit_state(previous_ruleset),
            after: ruleset_audit_state(ruleset),
            diff: ruleset_position_diff(previous_ruleset, ruleset),
            links: %{"guardrail_decision_id" => decision.id}
          })
        )
      end)
      |> Repo.transact()
      |> case do
        {:ok, %{decision: decision, ruleset: ruleset}} ->
          maybe_schedule_auto_advance_tick(command, %{decision: decision, ruleset: ruleset})
          {:ok, guardrail_status_payload(decision, ruleset.version)}

        {:error, :ruleset, %Changeset{} = changeset, _changes} ->
          {:error, ruleset_error(changeset, command.flag_key, command.environment_key)}

        {:error, :flag_environment, %Changeset{} = changeset, _changes} ->
          {:error, store_changeset_error(changeset, command.flag_key, command.environment_key)}

        {:error, :decision, %Changeset{} = changeset, _changes} ->
          {:error, store_changeset_error(changeset, command.flag_key, command.environment_key)}

        {:error, _operation, reason, _changes} ->
          {:error, StoreError.unavailable(cause: reason)}
      end
    end
  end

  @impl Store
  def evaluate_guarded_rollout(%Command.EvaluateGuardedRollout{} = command) do
    with {:ok, environment} <- fetch_environment(command.environment_key),
         {:ok, flag, flag_environment} <-
           fetch_flag_environment(command.flag_key, environment.key),
         :ok <- ensure_not_archived(command.flag_key, flag),
         {:ok, active_ruleset} <- ensure_active_ruleset(flag_environment, command),
         {:ok, rollout_rule} <- resolve_rollout_rule(active_ruleset, command.rule_key) do
      signal_facts = Enum.map(command.signal_facts, &SignalFact.new/1)

      evaluated =
        Decision.evaluate(signal_facts,
          evaluated_at: now(),
          monitoring_window_ends_at: command.monitoring_window_ends_at
        )

      execute_guardrail_decision(
        command,
        environment,
        flag,
        flag_environment,
        active_ruleset,
        rollout_rule,
        evaluated
      )
    end
  end

  @impl Store
  def upsert_rollout_auto_advance_policy(%Command.UpsertRolloutAutoAdvancePolicy{} = command) do
    with :ok <- Command.UpsertRolloutAutoAdvancePolicy.validate_required_fields(command) do
      attrs = auto_advance_policy_attrs(command)

      %RolloutAutoAdvancePolicy{}
      |> RolloutAutoAdvancePolicy.changeset(attrs)
      |> Repo.insert(
        on_conflict: {:replace_all_except, [:id, :inserted_at]},
        conflict_target: [:flag_key, :environment_key, :rule_key]
      )
      |> case do
        {:ok, policy} ->
          {:ok, %{policy: serialize_rollout_auto_advance_policy(policy)}}

        {:error, %Changeset{} = changeset} ->
          {:error, auto_advance_policy_changeset_error(changeset, command)}
      end
    else
      {:error, errors} when is_map(errors) ->
        {:error,
         StoreError.invalid_command("rollout auto-advance policy is invalid",
           metadata: %{
             flag_key: command.flag_key,
             environment_key: command.environment_key,
             rule_key: command.rule_key
           },
           details: auto_advance_policy_field_errors(errors)
         )}
    end
  end

  @impl Store
  def fetch_rollout_auto_advance_policy(%Command.FetchRolloutAutoAdvancePolicy{} = command) do
    case Repo.get_by(RolloutAutoAdvancePolicy,
           flag_key: command.flag_key,
           environment_key: command.environment_key,
           rule_key: command.rule_key
         ) do
      nil ->
        {:error, rollout_auto_advance_policy_not_found_error(command)}

      policy ->
        {:ok, %{policy: serialize_rollout_auto_advance_policy(policy)}}
    end
  end

  @impl Store
  def evaluate_rollout_auto_advance(%Command.EvaluateRolloutAutoAdvance{} = command) do
    fetch_command =
      Command.FetchRolloutAutoAdvancePolicy.new(
        command.flag_key,
        command.environment_key,
        command.rule_key
      )

    with {:ok, %{policy: policy}} <- fetch_rollout_auto_advance_policy(fetch_command) do
      evaluated_at =
        command.evaluated_at ||
          now()
          |> DateTime.truncate(:second)

      AutoAdvance.evaluate_eligibility(policy, %{
        signal_facts: command.signal_facts,
        monitoring_window_ends_at: command.monitoring_window_ends_at,
        evaluated_at: evaluated_at
      })
      |> case do
        {:ok, eligibility} -> {:ok, %{eligibility: eligibility}}
      end
    end
  end

  @impl Store
  def fetch_guardrail_status(%Command.FetchGuardrailStatus{} = command) do
    with {:ok, _environment} <- fetch_environment(command.environment_key),
         {:ok, _flag, flag_environment} <-
           fetch_flag_environment(command.flag_key, command.environment_key),
         %GuardrailDecision{} = decision <- latest_guardrail_decision(command) do
      {:ok, guardrail_status_payload(decision, active_ruleset_version(flag_environment))}
    else
      nil ->
        {:error, StoreError.invalid_command("guardrail status was not found")}

      {:error, error} ->
        {:error, error}
    end
  end

  @impl Store
  def engage_kill_switch(%Command.EngageKillSwitch{} = command) do
    case audit_result(command) do
      :denied ->
        with {:ok, audit_event} <- insert_audit_only_event(command, "kill_switch.engage", :denied) do
          {:ok, %{audit_event: AuditEvent.serialize(audit_event)}}
        end

      _other ->
        with {:ok, environment} <- fetch_environment(command.environment_key),
             {:ok, flag, flag_environment} <-
               fetch_flag_environment(command.flag_key, environment.key),
             :ok <- ensure_not_archived(command.flag_key, flag) do
          before_state = audit_state(flag_environment)

          Multi.new()
          |> Multi.update(
            :flag_environment,
            FlagEnvironment.changeset(flag_environment, %{
              status: :killswitched,
              kill_switch_variant_key: "default"
            })
          )
          |> Multi.run(:runtime_snapshot, fn repo, _changes ->
            insert_runtime_snapshot(repo, environment, now())
          end)
          |> Multi.insert(
            :audit_event,
            audit_event_changeset(%AuditEvent{}, command, "kill_switch.engage", :ok, %{
              before: before_state,
              after: %{"status" => :killswitched, "kill_switch_variant_key" => "default"}
            })
          )
          |> Repo.transact()
          |> case do
            {:ok, _changes} ->
              fetch_flag(Command.FetchFlag.new(command.flag_key, command.environment_key))

            {:error, :flag_environment, %Changeset{} = changeset, _changes} ->
              {:error,
               store_changeset_error(changeset, command.flag_key, command.environment_key)}

            {:error, _operation, reason, _changes} ->
              {:error, StoreError.unavailable(cause: reason)}
          end
        end
    end
  end

  @impl Store
  def release_kill_switch(%Command.ReleaseKillSwitch{} = command) do
    case audit_result(command) do
      :denied ->
        with {:ok, audit_event} <-
               insert_audit_only_event(command, "kill_switch.release", :denied) do
          {:ok, %{audit_event: AuditEvent.serialize(audit_event)}}
        end

      _other ->
        with {:ok, environment} <- fetch_environment(command.environment_key),
             {:ok, flag, flag_environment} <-
               fetch_flag_environment(command.flag_key, environment.key),
             :ok <- ensure_not_archived(command.flag_key, flag) do
          before_state = audit_state(flag_environment)

          next_status =
            if(flag_environment.status == :killswitched,
              do: :active,
              else: flag_environment.status || :active
            )

          Multi.new()
          |> Multi.update(
            :flag_environment,
            FlagEnvironment.changeset(flag_environment, %{
              status: next_status,
              kill_switch_variant_key: nil
            })
          )
          |> Multi.run(:runtime_snapshot, fn repo, _changes ->
            insert_runtime_snapshot(repo, environment, now())
          end)
          |> Multi.insert(
            :audit_event,
            audit_event_changeset(%AuditEvent{}, command, "kill_switch.release", :ok, %{
              before: before_state,
              after: %{"status" => next_status, "kill_switch_variant_key" => nil}
            })
          )
          |> Repo.transact()
          |> case do
            {:ok, _changes} ->
              fetch_flag(Command.FetchFlag.new(command.flag_key, command.environment_key))

            {:error, :flag_environment, %Changeset{} = changeset, _changes} ->
              {:error,
               store_changeset_error(changeset, command.flag_key, command.environment_key)}

            {:error, _operation, reason, _changes} ->
              {:error, StoreError.unavailable(cause: reason)}
          end
        end
    end
  end

  @impl Store
  def list_audit_events(%Command.ListAuditEvents{} = command) do
    entries =
      AuditEvent
      |> maybe_filter_audit_flag(command.flag_key)
      |> maybe_filter_audit_environment(command.environment_key)
      |> maybe_filter_audit_actor_id(command.actor_id)
      |> maybe_filter_audit_mutation(command.mutation)
      |> maybe_filter_audit_occurred_after(command.occurred_after)
      |> maybe_filter_audit_occurred_before(command.occurred_before)
      |> order_by([event], desc: event.occurred_at, desc: event.inserted_at)
      |> limit(^command.limit)
      |> Repo.all()
      |> Enum.map(&AuditEvent.serialize/1)

    {:ok,
     %Command.Page{
       entries: entries,
       limit: command.limit,
       has_next_page?: false,
       has_previous_page?: false
     }}
  end

  @impl Store
  def rollback_audit_event(%Command.RollbackAuditEvent{} = command) do
    case Repo.get(AuditEvent, command.audit_event_id) do
      nil ->
        {:error, StoreError.invalid_command("audit event was not found")}

      audit_event ->
        rollback_audit_event(command, audit_event)
    end
  end

  @impl Store
  def submit_change_request(%Command.SubmitChangeRequest{} = command) do
    with {:ok, command} <- prepare_audience_mutation_change_request(command) do
      correlation_id = governance_correlation_id(command)
      submitted_at = now()

      Multi.new()
      |> Multi.run(:change_request, fn repo, _changes ->
        insert_change_request(repo, command, correlation_id, submitted_at)
      end)
      |> Multi.run(:audit_event, fn repo, %{change_request: change_request} ->
        audit_command = governance_audit_command(command, change_request, "submitted")

        repo.insert(
          audit_event_changeset(%AuditEvent{}, audit_command, "change_request.submitted", :ok, %{
            resource_key: change_request.resource_key,
            environment_key: change_request.environment_key
          })
        )
      end)
      |> enqueue_webhook_deliveries(
        "change_request.submitted",
        fn %{change_request: cr} ->
          %{
            "change_request_id" => cr.id,
            "status" => to_string(cr[:status] || cr[:state]),
            "governed_action" => to_string(cr[:governed_action] || cr[:action]),
            "reason" => cr.reason,
            "submitter" => Map.take(cr, [:submitter_id, :submitter_type, :submitter_display])
          }
        end,
        environment_key: command.environment_key,
        resource_type: command.resource_type || "flag",
        resource_key: command.resource_key
      )
      |> Repo.transact()
      |> case do
        {:ok, %{change_request: change_request, audit_event: audit_event}} ->
          emit_governance_telemetry(:submitted, command, change_request, audit_event)
          {:ok, %{change_request: serialize_change_request_row(change_request)}}

        {:error, _operation, reason, _changes} ->
          {:error, normalize_governance_failure(reason)}
      end
    end
  end

  @impl Store
  def approve_change_request(%Command.ApproveChangeRequest{} = command) do
    with {:ok, change_request} <- fetch_change_request_row(command.change_request_id),
         :ok <- ensure_governance_transition(change_request, ["submitted"]),
         :ok <- ensure_unique_reviewer(change_request.id, command) do
      reviewed_at = now()

      Multi.new()
      |> Multi.run(:approval, fn repo, _changes ->
        insert_approval(repo, change_request, command, "approved", reviewed_at)
      end)
      |> Multi.run(:change_request, fn repo, _changes ->
        approved_count = approved_count(repo, change_request.id) + 1

        next_status =
          if(approved_count >= required_approvals(change_request.approval_requirement_snapshot),
            do: "approved",
            else: "submitted"
          )

        update_change_request(repo, change_request, %{
          status: next_status,
          resolved_at: if(next_status == "approved", do: reviewed_at, else: nil),
          updated_at: reviewed_at
        })
      end)
      |> Multi.run(:audit_event, fn repo,
                                    %{approval: approval, change_request: updated_change_request} ->
        audit_command =
          governance_audit_command(command, updated_change_request, "approved")
          |> Map.update!(:metadata, &Map.put(&1, "approval_id", approval.id))

        repo.insert(
          audit_event_changeset(%AuditEvent{}, audit_command, "change_request.approved", :ok, %{
            resource_key: updated_change_request.resource_key,
            environment_key: updated_change_request.environment_key
          })
        )
      end)
      |> enqueue_webhook_deliveries(
        "change_request.approved",
        fn %{change_request: cr} ->
          %{
            "change_request_id" => cr.id,
            "status" => to_string(cr[:status] || cr[:state]),
            "governed_action" => to_string(cr[:governed_action] || cr[:action]),
            "reason" => cr.reason,
            "submitter" => Map.take(cr, [:submitter_id, :submitter_type, :submitter_display])
          }
        end,
        environment_key: change_request.environment_key,
        resource_type: change_request.resource_type,
        resource_key: change_request.resource_key
      )
      |> Repo.transact()
      |> case do
        {:ok,
         %{approval: approval, change_request: updated_change_request, audit_event: audit_event}} ->
          emit_governance_telemetry(:approved, command, updated_change_request, audit_event)

          {:ok,
           %{
             change_request: serialize_change_request_row(updated_change_request),
             approval: serialize_approval_row(approval)
           }}

        {:error, _operation, reason, _changes} ->
          {:error, normalize_governance_failure(reason)}
      end
    end
  end

  @impl Store
  def reject_change_request(%Command.RejectChangeRequest{} = command) do
    with {:ok, change_request} <- fetch_change_request_row(command.change_request_id),
         :ok <- ensure_governance_transition(change_request, ["submitted"]) do
      reviewed_at = now()

      Multi.new()
      |> Multi.run(:approval, fn repo, _changes ->
        insert_approval(repo, change_request, command, "rejected", reviewed_at)
      end)
      |> Multi.run(:change_request, fn repo, _changes ->
        update_change_request(repo, change_request, %{
          status: "rejected",
          resolved_at: reviewed_at,
          updated_at: reviewed_at
        })
      end)
      |> Multi.run(:audit_event, fn repo,
                                    %{approval: approval, change_request: updated_change_request} ->
        audit_command =
          governance_audit_command(command, updated_change_request, "rejected")
          |> Map.update!(:metadata, fn metadata ->
            Map.merge(
              metadata,
              audience_mutation_terminal_metadata(change_request, command.reason)
            )
          end)
          |> Map.update!(:metadata, &Map.put(&1, "approval_id", approval.id))

        repo.insert(
          audit_event_changeset(%AuditEvent{}, audit_command, "change_request.rejected", :ok, %{
            resource_key: updated_change_request.resource_key,
            environment_key: updated_change_request.environment_key
          })
        )
      end)
      |> enqueue_webhook_deliveries(
        "change_request.rejected",
        fn %{change_request: cr} ->
          %{
            "change_request_id" => cr.id,
            "status" => to_string(cr[:status] || cr[:state]),
            "governed_action" => to_string(cr[:governed_action] || cr[:action]),
            "reason" => cr.reason,
            "submitter" => Map.take(cr, [:submitter_id, :submitter_type, :submitter_display])
          }
        end,
        environment_key: change_request.environment_key,
        resource_type: change_request.resource_type,
        resource_key: change_request.resource_key
      )
      |> Repo.transact()
      |> case do
        {:ok, %{change_request: updated_change_request, audit_event: audit_event}} ->
          emit_governance_telemetry(:rejected, command, updated_change_request, audit_event)
          {:ok, %{change_request: serialize_change_request_row(updated_change_request)}}

        {:error, _operation, reason, _changes} ->
          {:error, normalize_governance_failure(reason)}
      end
    end
  end

  @impl Store
  def cancel_change_request(%Command.CancelChangeRequest{} = command) do
    with {:ok, change_request} <- fetch_change_request_row(command.change_request_id),
         :ok <- ensure_governance_transition(change_request, ["submitted", "approved"]) do
      cancelled_at = now()

      Multi.new()
      |> Multi.run(:change_request, fn repo, _changes ->
        update_change_request(repo, change_request, %{
          status: "cancelled",
          resolved_at: cancelled_at,
          updated_at: cancelled_at
        })
      end)
      |> Multi.run(:audit_event, fn repo, %{change_request: updated_change_request} ->
        audit_command =
          governance_audit_command(command, updated_change_request, "cancelled")
          |> Map.update!(:metadata, fn metadata ->
            Map.merge(
              metadata,
              audience_mutation_terminal_metadata(change_request, command.reason)
            )
          end)

        repo.insert(
          audit_event_changeset(%AuditEvent{}, audit_command, "change_request.cancelled", :ok, %{
            resource_key: updated_change_request.resource_key,
            environment_key: updated_change_request.environment_key
          })
        )
      end)
      |> enqueue_webhook_deliveries(
        "change_request.cancelled",
        fn %{change_request: cr} ->
          %{
            "change_request_id" => cr.id,
            "status" => to_string(cr[:status] || cr[:state]),
            "governed_action" => to_string(cr[:governed_action] || cr[:action]),
            "reason" => cr.reason,
            "submitter" => Map.take(cr, [:submitter_id, :submitter_type, :submitter_display])
          }
        end,
        environment_key: change_request.environment_key,
        resource_type: change_request.resource_type,
        resource_key: change_request.resource_key
      )
      |> Repo.transact()
      |> case do
        {:ok, %{change_request: updated_change_request}} ->
          {:ok, %{change_request: serialize_change_request_row(updated_change_request)}}

        {:error, _operation, reason, _changes} ->
          {:error, normalize_governance_failure(reason)}
      end
    end
  end

  @impl Store
  def execute_change_request(%Command.ExecuteChangeRequest{} = command) do
    with {:ok, change_request} <- fetch_change_request_row(command.change_request_id),
         :ok <- ensure_governance_transition(change_request, ["approved"]) do
      executed_at = now()

      Multi.new()
      |> Multi.run(:execution, fn _repo, _changes ->
        case execute_governed_change(change_request, command) do
          {:ok, res, cr, evt} -> {:ok, {res, cr, evt}}
          {:error, err} -> {:error, err}
        end
      end)
      |> Multi.run(:change_request, fn repo, _changes ->
        update_change_request(repo, change_request, %{
          status: "executed",
          resolved_at: executed_at,
          executed_at: executed_at,
          updated_at: executed_at
        })
      end)
      |> Multi.run(:audit_event, fn repo, %{change_request: updated_change_request} ->
        audit_command = governance_audit_command(command, updated_change_request, "merged")

        repo.insert(
          audit_event_changeset(%AuditEvent{}, audit_command, "change_request.merged", :ok, %{
            resource_key: updated_change_request.resource_key,
            environment_key: updated_change_request.environment_key
          })
        )
      end)
      |> enqueue_webhook_deliveries(
        "change_request.merged",
        fn %{change_request: cr} ->
          %{
            "change_request_id" => cr.id,
            "status" => to_string(cr[:status] || cr[:state]),
            "governed_action" => to_string(cr[:governed_action] || cr[:action]),
            "reason" => cr.reason,
            "submitter" => Map.take(cr, [:submitter_id, :submitter_type, :submitter_display])
          }
        end,
        environment_key: change_request.environment_key,
        resource_type: change_request.resource_type,
        resource_key: change_request.resource_key
      )
      |> Repo.transact()
      |> case do
        {:ok,
         %{
           execution: {execution_result, _, _},
           change_request: updated_change_request,
           audit_event: audit_event
         }} ->
          emit_governance_telemetry(:merged, command, updated_change_request, audit_event)

          {:ok,
           %{
             change_request: serialize_change_request_row(updated_change_request),
             execution_result: execution_result
           }}

        {:error, _operation, reason, _changes} ->
          {:error, normalize_governance_failure(reason)}
      end
    end
  end

  @impl Store
  def fetch_change_request(%Command.FetchChangeRequest{} = command) do
    with {:ok, change_request} <- fetch_change_request_row(command.change_request_id) do
      {:ok,
       %{
         change_request: serialize_change_request_row(change_request),
         approvals: list_approval_rows(change_request.id) |> Enum.map(&serialize_approval_row/1),
         audit_events: list_change_request_audit_events(change_request)
       }}
    end
  end

  @impl Store
  def list_change_requests(%Command.ListChangeRequests{} = command) do
    entries =
      from(cr in "change_requests",
        order_by: [desc: field(cr, :inserted_at)],
        limit: ^command.limit,
        select:
          map(cr, [
            :id,
            :status,
            :governed_action,
            :environment_key,
            :resource_type,
            :resource_key,
            :submitter_id,
            :submitter_type,
            :submitter_display,
            :reason,
            :approval_requirement_snapshot,
            :command_snapshot,
            :metadata,
            :correlation_id,
            :submitted_at,
            :resolved_at,
            :executed_at,
            :inserted_at,
            :updated_at
          ])
      )
      |> maybe_filter_change_request(:environment_key, command.environment_key)
      |> maybe_filter_change_request(:resource_type, command.resource_type)
      |> maybe_filter_change_request(:resource_key, command.resource_key)
      |> maybe_filter_change_request(:submitter_id, command.submitted_by_id)
      |> maybe_filter_change_request(
        :governed_action,
        normalize_change_request_filter(command.action)
      )
      |> maybe_filter_change_request(:status, normalize_change_request_filter(command.status))
      |> Repo.all()
      |> Enum.map(&normalize_governance_row/1)
      |> Enum.map(&serialize_change_request_row/1)

    {:ok,
     %Command.Page{
       entries: entries,
       limit: command.limit,
       has_next_page?: false,
       has_previous_page?: false
     }}
  end

  @impl Store
  def schedule_change_request(%Command.ScheduleChangeRequest{} = command) do
    with {:ok, change_request} <- fetch_change_request_row(command.change_request_id),
         :ok <- ensure_governance_transition(change_request, ["approved"]),
         {:ok, approvals} <- approved_snapshot(change_request.id) do
      attrs = %{
        state: "scheduled",
        change_request_id: uuid_param(change_request.id),
        governed_action: change_request.governed_action,
        environment_key: change_request.environment_key,
        resource_type: change_request.resource_type,
        resource_key: change_request.resource_key,
        execution_mode: "change_request",
        scheduled_by_id: actor_value(command.actor, "id"),
        scheduled_by_type: actor_value(command.actor, "type") || "operator",
        scheduled_by_display: actor_value(command.actor, "display"),
        approved_by_snapshot: approvals,
        execution_metadata: %{},
        scheduled_for: command.scheduled_for,
        executed_at: nil,
        attempt_count: 0,
        failure_reason: nil,
        last_oban_job_id: nil,
        command_snapshot: change_request.command_snapshot,
        approval_requirement_snapshot: change_request.approval_requirement_snapshot,
        metadata:
          Command.GovernanceSupport.with_tenant_provenance(
            command.metadata,
            change_request.command_snapshot
          ),
        correlation_id: change_request.correlation_id,
        idempotency_key: "scheduled_execution:change_request:#{change_request.id}",
        inserted_at: now(),
        updated_at: now()
      }

      insert_scheduled_execution(attrs, command)
    end
  end

  @impl Store
  def schedule_governed_action(%Command.ScheduleGovernedAction{} = command) do
    correlation_id = governance_correlation_id(command)
    inserted_at = now()

    idempotency_key =
      command.idempotency_key ||
        "scheduled_execution:#{correlation_id}"

    attrs = %{
      state: "scheduled",
      change_request_id: nil,
      governed_action: Atom.to_string(command.action),
      environment_key: command.environment_key,
      resource_type: command.resource_type,
      resource_key: command.resource_key,
      execution_mode: Atom.to_string(command.execution_mode),
      scheduled_by_id: actor_value(command.actor, "id"),
      scheduled_by_type: actor_value(command.actor, "type") || "operator",
      scheduled_by_display: actor_value(command.actor, "display"),
      approved_by_snapshot: [],
      execution_metadata: %{},
      scheduled_for: command.scheduled_for,
      executed_at: nil,
      attempt_count: 0,
      failure_reason: nil,
      last_oban_job_id: nil,
      command_snapshot: Command.GovernanceSupport.with_tenant_provenance(command.command),
      approval_requirement_snapshot: command.approval_requirement,
      metadata:
        Command.GovernanceSupport.with_tenant_provenance(command.metadata, command.command),
      correlation_id: correlation_id,
      idempotency_key: idempotency_key,
      inserted_at: inserted_at,
      updated_at: inserted_at
    }

    insert_scheduled_execution(attrs, command)
  end

  @impl Store
  def cancel_scheduled_execution(%Command.CancelScheduledExecution{} = command) do
    with {:ok, scheduled_execution} <-
           fetch_scheduled_execution_row(command.scheduled_execution_id),
         :ok <- ensure_scheduled_transition(scheduled_execution.state, ["scheduled", "running"]) do
      Multi.new()
      |> Multi.run(:scheduled_execution, fn repo, _changes ->
        repo.update_all(
          from(se in "scheduled_executions",
            where: field(se, :id) == ^uuid_param(scheduled_execution.id)
          ),
          set: [
            state: "cancelled",
            failure_reason: command.reason,
            executed_at: scheduled_execution.executed_at,
            execution_metadata:
              scheduled_transition_metadata(
                scheduled_execution.execution_metadata,
                "cancelled",
                command
              ),
            updated_at: now()
          ]
        )

        fetch_scheduled_execution_row(scheduled_execution.id)
      end)
      |> Multi.run(:audit_event, fn repo, %{scheduled_execution: updated} ->
        insert_scheduled_execution_audit_event(
          repo,
          updated,
          command,
          "scheduled_execution.cancelled",
          :ok
        )
      end)
      |> Repo.transact()
      |> case do
        {:ok, %{scheduled_execution: updated, audit_event: audit_event}} ->
          emit_scheduled_execution_telemetry(:cancelled, command, updated, audit_event)

          {:ok, %{scheduled_execution: serialize_scheduled_execution_row(updated), attempts: []}}

        {:error, _operation, error, _changes} ->
          {:error, normalize_governance_failure(error)}
      end
    end
  end

  @impl Store
  def requeue_scheduled_execution(%Command.RequeueScheduledExecution{} = command) do
    with {:ok, scheduled_execution} <-
           fetch_scheduled_execution_row(command.scheduled_execution_id),
         :ok <- ensure_scheduled_transition(scheduled_execution.state, ["quarantined"]) do
      Multi.new()
      |> Multi.run(:scheduled_execution, fn repo, _changes ->
        repo.update_all(
          from(se in "scheduled_executions",
            where: field(se, :id) == ^uuid_param(scheduled_execution.id)
          ),
          set: [
            state: "scheduled",
            failure_reason: nil,
            execution_metadata:
              scheduled_transition_metadata(
                scheduled_execution.execution_metadata,
                "requeued",
                command
              ),
            updated_at: now()
          ]
        )

        fetch_scheduled_execution_row(scheduled_execution.id)
      end)
      |> Multi.run(:oban_job, fn repo, %{scheduled_execution: updated} ->
        enqueue_scheduled_execution_job(repo, updated, command.actor, now())
      end)
      |> Multi.run(:persist_job_id, fn repo,
                                       %{scheduled_execution: updated, oban_job: oban_job} ->
        repo.update_all(
          from(se in "scheduled_executions", where: field(se, :id) == ^uuid_param(updated.id)),
          set: [last_oban_job_id: oban_job.id, updated_at: now()]
        )

        fetch_scheduled_execution_row(updated.id)
      end)
      |> Multi.run(:audit_event, fn repo, %{persist_job_id: updated} ->
        insert_scheduled_execution_audit_event(
          repo,
          updated,
          command,
          "scheduled_execution.requeued",
          :ok
        )
      end)
      |> Repo.transact()
      |> case do
        {:ok, %{persist_job_id: updated, audit_event: audit_event}} ->
          emit_scheduled_execution_telemetry(:requeued, command, updated, audit_event)

          {:ok,
           %{
             scheduled_execution: serialize_scheduled_execution_row(updated),
             attempts:
               list_execution_attempt_rows(updated.id)
               |> Enum.map(&serialize_execution_attempt_row/1)
           }}

        {:error, _operation, reason, _changes} ->
          {:error, normalize_governance_failure(reason)}
      end
    end
  end

  @impl Store
  def execute_scheduled_execution(%Command.ExecuteScheduledExecution{} = command) do
    with {:ok, scheduled_execution} <-
           fetch_scheduled_execution_row(command.scheduled_execution_id) do
      case scheduled_execution.state do
        "completed" ->
          {:ok,
           %{
             scheduled_execution: serialize_scheduled_execution_row(scheduled_execution),
             attempts:
               list_execution_attempt_rows(scheduled_execution.id)
               |> Enum.map(&serialize_execution_attempt_row/1)
           }}

        "cancelled" ->
          {:error, StoreError.invalid_command("scheduled execution is cancelled")}

        "quarantined" ->
          {:error, StoreError.invalid_command("scheduled execution requires explicit requeue")}

        _other ->
          if lifecycle_telemetry_enabled?(command) do
            emit_scheduled_execution_telemetry(:started, command, scheduled_execution, nil,
              attempt_count: scheduled_execution.attempt_count + 1
            )
          end

          run_scheduled_execution(command, scheduled_execution)
      end
    end
  end

  @impl Store
  def fetch_scheduled_execution(%Command.FetchScheduledExecution{} = command) do
    with {:ok, scheduled_execution} <-
           fetch_scheduled_execution_row(command.scheduled_execution_id) do
      {:ok,
       %{
         scheduled_execution: serialize_scheduled_execution_row(scheduled_execution),
         attempts:
           list_execution_attempt_rows(scheduled_execution.id)
           |> Enum.map(&serialize_execution_attempt_row/1)
       }}
    end
  end

  @impl Store
  def list_scheduled_executions(%Command.ListScheduledExecutions{} = command) do
    entries =
      from(se in "scheduled_executions",
        order_by: [asc: field(se, :scheduled_for), asc: field(se, :inserted_at)],
        limit: ^command.limit,
        select:
          map(se, [
            :id,
            :state,
            :change_request_id,
            :governed_action,
            :environment_key,
            :resource_type,
            :resource_key,
            :execution_mode,
            :scheduled_by_id,
            :scheduled_by_type,
            :scheduled_by_display,
            :approved_by_snapshot,
            :execution_metadata,
            :scheduled_for,
            :executed_at,
            :attempt_count,
            :failure_reason,
            :last_oban_job_id,
            :command_snapshot,
            :approval_requirement_snapshot,
            :metadata,
            :correlation_id,
            :idempotency_key
          ])
      )
      |> maybe_filter_scheduled_execution(:environment_key, command.environment_key)
      |> maybe_filter_scheduled_execution(:resource_type, command.resource_type)
      |> maybe_filter_scheduled_execution(:resource_key, command.resource_key)
      |> maybe_filter_scheduled_execution(:change_request_id, command.change_request_id)
      |> maybe_filter_scheduled_execution(:scheduled_by_id, command.scheduled_by_id)
      |> maybe_filter_scheduled_state(command.state)
      |> maybe_filter_scheduled_action(command.action)
      |> maybe_filter_scheduled_window(:after, command.after)
      |> maybe_filter_scheduled_window(:before, command.before)
      |> Repo.all()
      |> Enum.map(&normalize_scheduled_execution_row/1)
      |> Enum.map(&serialize_scheduled_execution_row/1)

    {:ok,
     %Command.Page{
       entries: entries,
       limit: command.limit,
       has_next_page?: false,
       has_previous_page?: false
     }}
  end

  @impl Store
  def receive_inbound_webhook(%Command.ReceiveInboundWebhook{} = command) do
    correlation_id = command.correlation_id || Ecto.UUID.generate()
    inserted_at = now()

    attrs = %{
      provider: command.provider,
      endpoint_key: command.endpoint_key,
      delivery_id: command.delivery_id,
      attempt_id: command.attempt_id,
      topic: command.topic,
      occurred_at: command.occurred_at,
      received_at: command.received_at,
      raw_body_sha256: command.raw_body_sha256,
      verification_metadata: command.verification_metadata,
      normalized_payload: command.normalized_payload,
      dedupe_key: command.dedupe_key,
      verified_state: Atom.to_string(command.verified_state),
      rejection_reason: command.rejection_reason,
      correlation_id: correlation_id,
      inserted_at: inserted_at,
      updated_at: inserted_at
    }

    Multi.new()
    |> Multi.run(:receipt, fn repo, _changes ->
      case repo.insert_all("webhook_receipts", [attrs], returning: webhook_receipt_fields()) do
        {1, [row]} -> {:ok, normalize_webhook_receipt_row(row)}
        _ -> {:error, StoreError.unavailable()}
      end
    end)
    |> Multi.run(:replay_claim, fn repo, %{receipt: receipt} ->
      if command.verified_state == :accepted and not is_nil(command.delivery_id) do
        case repo.insert_all("webhook_replay_claims", [
               %{
                 provider: command.provider,
                 delivery_id: command.delivery_id,
                 receipt_id: uuid_param(receipt.id),
                 inserted_at: inserted_at
               }
             ]) do
          {1, _} -> {:ok, :recorded}
          _ -> {:error, :duplicate}
        end
      else
        {:ok, :skipped}
      end
    end)
    |> Repo.transact()
    |> case do
      {:ok, %{receipt: receipt}} ->
        {:ok, serialize_webhook_receipt_row(receipt)}

      {:error, :replay_claim, :duplicate, _changes} ->
        {:error, StoreError.invalid_command("duplicate webhook delivery")}

      {:error, _operation, reason, _changes} ->
        {:error, normalize_governance_failure(reason)}
    end
  rescue
    error in [ConstraintError, Postgrex.Error] ->
      {:error, StoreError.invalid_command("duplicate webhook delivery", cause: error)}
  end

  @impl Store
  def fetch_webhook_record(%Command.FetchWebhookRecord{} = command) do
    case from(wr in "webhook_receipts",
           where: field(wr, :id) == ^uuid_param(command.receipt_id),
           select: map(wr, ^webhook_receipt_fields())
         )
         |> Repo.one() do
      nil -> {:error, StoreError.invalid_command("webhook record was not found")}
      row -> {:ok, serialize_webhook_receipt_row(normalize_webhook_receipt_row(row))}
    end
  end

  @impl Store
  def list_webhook_records(%Command.ListWebhookRecords{} = command) do
    entries =
      from(wr in "webhook_receipts",
        order_by: [desc: field(wr, :received_at), desc: field(wr, :inserted_at)],
        limit: ^command.limit,
        select: map(wr, ^webhook_receipt_fields())
      )
      |> maybe_filter_webhook_record(:provider, command.provider)
      |> maybe_filter_webhook_record(:endpoint_key, command.endpoint_key)
      |> maybe_filter_webhook_record(
        :verified_state,
        normalize_webhook_state_filter(command.verified_state)
      )
      |> maybe_filter_webhook_record(:topic, command.topic)
      |> Repo.all()
      |> Enum.map(&normalize_webhook_receipt_row/1)
      |> Enum.map(&serialize_webhook_receipt_row/1)

    {:ok,
     %Command.Page{
       entries: entries,
       limit: command.limit,
       has_next_page?: false,
       has_previous_page?: false
     }}
  end

  @impl Store
  def create_webhook_destination(%Command.CreateWebhookDestination{} = command) do
    attrs = %{
      name: command.name,
      description: command.description,
      url: command.url,
      secret_id: command.secret_id,
      environment_key: command.environment_key,
      subscriptions: command.subscriptions,
      enabled: command.enabled,
      metadata: command.metadata
    }

    %Destination{}
    |> Destination.changeset(attrs)
    |> Repo.insert()
    |> case do
      {:ok, destination} ->
        {:ok, serialize_webhook_destination(destination)}

      {:error, changeset} ->
        {:error, StoreError.invalid_command("failed to create destination", cause: changeset)}
    end
  end

  @impl Store
  def update_webhook_destination(%Command.UpdateWebhookDestination{} = command) do
    with {:ok, destination} <- fetch_destination_by_id(command.id) do
      attrs =
        %{
          name: command.name,
          description: command.description,
          url: command.url,
          secret_id: command.secret_id,
          subscriptions: command.subscriptions,
          enabled: command.enabled,
          metadata: command.metadata
        }
        |> Enum.reject(fn {_k, v} -> is_nil(v) end)
        |> Map.new()

      destination
      |> Destination.changeset(attrs)
      |> Repo.update()
      |> case do
        {:ok, updated} ->
          {:ok, serialize_webhook_destination(updated)}

        {:error, changeset} ->
          {:error, StoreError.invalid_command("failed to update destination", cause: changeset)}
      end
    end
  end

  @impl Store
  def fetch_webhook_destination(%Command.FetchWebhookDestination{} = command) do
    case fetch_destination_by_id(command.id) do
      {:ok, destination} -> {:ok, serialize_webhook_destination(destination)}
      error -> error
    end
  end

  @impl Store
  def list_webhook_destinations(%Command.ListWebhookDestinations{} = command) do
    entries =
      from(d in Destination,
        order_by: [asc: d.name, asc: d.inserted_at],
        limit: ^command.limit
      )
      |> maybe_filter_destination(:environment_key, command.environment_key)
      |> Repo.all()
      |> Enum.map(&serialize_webhook_destination/1)

    {:ok,
     %Command.Page{
       entries: entries,
       limit: command.limit,
       has_next_page?: false,
       has_previous_page?: false
     }}
  end

  @impl Store
  def list_webhook_deliveries(%Command.ListWebhookDeliveries{} = command) do
    entries =
      from(d in Delivery,
        order_by: [desc: d.inserted_at],
        limit: ^command.limit,
        preload: [:webhook_outbound_event, :webhook_destination]
      )
      |> maybe_filter_delivery(:webhook_destination_id, command.destination_id)
      |> maybe_filter_delivery(:webhook_outbound_event_id, command.event_id)
      |> maybe_filter_delivery(:state, command.state)
      |> Repo.all()
      |> Enum.map(&serialize_webhook_delivery/1)

    {:ok,
     %Command.Page{
       entries: entries,
       limit: command.limit,
       has_next_page?: false,
       has_previous_page?: false
     }}
  end

  @impl Store
  def retry_webhook_delivery(%Command.RetryWebhookDelivery{} = command) do
    case Repo.get(Delivery, command.delivery_id) do
      nil ->
        {:error, StoreError.invalid_command("delivery was not found")}

      delivery ->
        # Just reset state to pending, next worker run will pick it up
        delivery
        |> Delivery.changeset(%{
          state: :pending,
          attempt_count: 0,
          terminal_failure_reason: nil
        })
        |> Repo.update()
        |> case do
          {:ok, updated} ->
            {:ok, serialize_webhook_delivery(updated)}

          {:error, changeset} ->
            {:error, StoreError.invalid_command("failed to retry delivery", cause: changeset)}
        end
    end
  end

  defp fetch_environment(environment_key) do
    case Repo.get_by(Environment, key: to_string(environment_key)) do
      nil -> {:error, StoreError.environment_not_found(environment_key)}
      environment -> {:ok, environment}
    end
  end

  defp fetch_flag_environment(flag_key, environment_key) do
    case flag_with_environment_query(flag_key, environment_key) |> Repo.one() do
      nil -> {:error, StoreError.flag_not_found(flag_key, environment_key)}
      %{flag_environments: [flag_environment]} = flag -> {:ok, flag, flag_environment}
    end
  end

  defp compare_flags_query(source_environment_key, target_environment_key, flag_keys) do
    [source_environment_key, target_environment_key]
    |> then(fn environment_keys ->
      from(flag in Flag,
        join: fe in FlagEnvironment,
        on: fe.flag_id == flag.id,
        join: env in Environment,
        on: env.id == fe.environment_id,
        where: env.key in ^environment_keys,
        preload: [flag_environments: {fe, [:environment, :active_ruleset]}],
        distinct: true
      )
    end)
    |> maybe_filter_compare_flag_keys(flag_keys)
  end

  defp maybe_filter_compare_flag_keys(query, nil), do: query
  defp maybe_filter_compare_flag_keys(query, []), do: query

  defp maybe_filter_compare_flag_keys(query, flag_keys) do
    where(query, [flag], flag.key in ^flag_keys)
  end

  defp compare_payloads(flags, environment) do
    flags
    |> Enum.reduce(%{}, fn flag, payloads ->
      case Enum.find(flag.flag_environments, &(&1.environment.key == environment.key)) do
        nil ->
          payloads

        flag_environment ->
          Map.put(
            payloads,
            flag.key,
            build_flag_payload(flag, environment, flag_environment, true)
          )
      end
    end)
  end

  defp ensure_not_archived(flag_key, flag) do
    if flag.archived_at do
      {:error, StoreError.archived(flag_key)}
    else
      :ok
    end
  end

  defp next_ruleset_version(flag_environment_id) do
    from(r in Ruleset,
      where: r.flag_environment_id == ^flag_environment_id,
      select: max(r.version)
    )
    |> Repo.one()
    |> Kernel.||(0)
    |> Kernel.+(1)
  end

  defp resolve_publishable_ruleset(flag_environment, environment_key, version) do
    rulesets_query =
      from(r in Ruleset,
        where: r.flag_environment_id == ^flag_environment.id
      )

    rulesets = Repo.all(rulesets_query)

    with {:ok, publish_version} <- normalize_publish_version(version, rulesets) do
      case Enum.find(rulesets, &(&1.version == publish_version)) do
        %Ruleset{status: :draft} = ruleset ->
          {:ok, ruleset}

        %Ruleset{} ->
          {:error,
           StoreError.invalid_command(
             "requested ruleset version is not publishable",
             metadata: %{
               requested_version: publish_version,
               active_version: active_version(flag_environment)
             }
           )}

        nil ->
          {:error,
           RulesetError.not_found(
             metadata: %{requested_version: publish_version, environment_key: environment_key}
           )}
      end
    end
  end

  defp normalize_publish_version(nil, rulesets) do
    case rulesets
         |> Enum.filter(&(&1.status == :draft))
         |> Enum.map(& &1.version)
         |> Enum.max(fn -> nil end) do
      nil -> {:error, RulesetError.not_found()}
      version -> {:ok, version}
    end
  end

  defp normalize_publish_version(version, _rulesets) when is_integer(version) and version > 0,
    do: {:ok, version}

  defp normalize_publish_version(version, _rulesets) when is_binary(version) do
    case Integer.parse(version) do
      {parsed, ""} when parsed > 0 -> {:ok, parsed}
      _ -> {:error, StoreError.invalid_command("publish version must be a positive integer")}
    end
  end

  defp normalize_publish_version(_version, _rulesets),
    do: {:error, StoreError.invalid_command("publish version must be a positive integer")}

  defp active_version(flag_environment),
    do: flag_environment.active_ruleset && flag_environment.active_ruleset.version

  defp apply_promotion_bundle(repo, target_environment, command, published_at) do
    Enum.reduce_while(command.flag_keys, {:ok, []}, fn flag_key, {:ok, acc} ->
      case apply_promoted_flag(repo, flag_key, target_environment.key, command, published_at) do
        {:ok, applied_flag} -> {:cont, {:ok, [applied_flag | acc]}}
        {:error, error} -> {:halt, {:error, error}}
      end
    end)
    |> case do
      {:ok, applied_flags} -> {:ok, Enum.reverse(applied_flags)}
      {:error, error} -> {:error, error}
    end
  end

  defp apply_manifest_import_bundle(repo, target_environment, command, published_at) do
    Enum.reduce_while(command.flag_keys, {:ok, []}, fn flag_key, {:ok, acc} ->
      case apply_imported_flag(repo, flag_key, target_environment, command, published_at) do
        {:ok, applied_flag} -> {:cont, {:ok, [applied_flag | acc]}}
        {:error, error} -> {:halt, {:error, error}}
      end
    end)
    |> case do
      {:ok, applied_flags} -> {:ok, Enum.reverse(applied_flags)}
      {:error, error} -> {:error, error}
    end
  end

  defp ensure_promotion_target_allowed(_target_environment_key, true), do: :ok

  defp ensure_promotion_target_allowed(target_environment_key, false) do
    if Compare.protected_target?(target_environment_key) do
      {:error, StoreError.invalid_command("promotion to protected targets requires governance")}
    else
      :ok
    end
  end

  defp apply_promoted_flag(repo, flag_key, target_environment_key, command, published_at) do
    with {:ok, flag, flag_environment} <-
           fetch_flag_environment_for_repo(repo, flag_key, target_environment_key),
         proposed_state <- Map.get(command.proposed_target_bundle, flag_key),
         false <- is_nil(proposed_state) do
      proposed_flag = Map.get(proposed_state, "flag", %{}) |> denormalize_promoted_value()

      proposed_flag_environment =
        Map.get(proposed_state, "flag_environment", %{}) |> denormalize_promoted_value()

      proposed_ruleset =
        Map.get(proposed_state, "active_ruleset", %{}) |> denormalize_promoted_value()

      flag_attrs =
        %{}
        |> maybe_put_update_field(:description, proposed_flag["description"])
        |> maybe_put_update_field(:default_value, proposed_flag["default_value"])
        |> maybe_put_update_field(:tags, proposed_flag["tags"])

      {:ok, updated_flag} =
        flag
        |> Flag.changeset(flag_attrs)
        |> repo.update()

      {:ok, inserted_ruleset} =
        %Ruleset{}
        |> Ruleset.changeset(%{
          flag_environment_id: flag_environment.id,
          version: next_ruleset_version_for_repo(repo, flag_environment.id),
          status: :published,
          salt: proposed_ruleset["salt"],
          published_at: published_at,
          metadata: Map.get(proposed_ruleset, "metadata", %{}),
          rules: Map.get(proposed_ruleset, "rules", [])
        })
        |> repo.insert()

      {:ok, updated_flag_environment} =
        flag_environment
        |> FlagEnvironment.changeset(%{
          active_ruleset_id: inserted_ruleset.id,
          status: normalize_flag_environment_status(proposed_flag_environment["status"]),
          kill_switch_variant_key: proposed_flag_environment["kill_switch_variant_key"],
          last_published_at: published_at,
          last_evaluated_at: proposed_flag_environment["last_evaluated_at"],
          variants_served: Map.get(proposed_flag_environment, "variants_served", %{})
        })
        |> repo.update()

      {:ok,
       %{
         flag_key: flag_key,
         flag: updated_flag,
         flag_environment: updated_flag_environment,
         ruleset: inserted_ruleset
       }}
    else
      true ->
        {:error,
         StoreError.invalid_command("promotion bundle is missing a proposed target state",
           metadata: %{flag_key: flag_key}
         )}

      {:error, error} ->
        {:error, error}
    end
  rescue
    error in [Ecto.InvalidChangesetError] ->
      {:error, StoreError.invalid_command("promotion apply could not be persisted", cause: error)}
  end

  defp apply_imported_flag(repo, flag_key, target_environment, command, published_at) do
    proposed_state = Map.get(command.proposed_target_bundle, flag_key)

    if is_nil(proposed_state) do
      {:error,
       StoreError.invalid_command("manifest import plan is missing a proposed target state",
         metadata: %{flag_key: flag_key}
       )}
    else
      proposed_flag = Map.get(proposed_state, "flag", %{}) |> denormalize_promoted_value()

      proposed_flag_environment =
        Map.get(proposed_state, "flag_environment", %{}) |> denormalize_promoted_value()

      proposed_ruleset =
        Map.get(proposed_state, "active_ruleset", %{}) |> denormalize_promoted_value()

      with {:ok, flag} <- upsert_import_flag(repo, flag_key, proposed_flag),
           :ok <- ensure_not_archived(flag_key, flag),
           {:ok, flag_environment} <-
             upsert_import_flag_environment(
               repo,
               flag,
               target_environment,
               proposed_flag_environment,
               published_at
             ),
           {:ok, inserted_ruleset} <-
             insert_import_ruleset(repo, flag_environment, proposed_ruleset, published_at),
           {:ok, updated_flag_environment} <-
             finalize_import_flag_environment(
               repo,
               flag_environment,
               inserted_ruleset,
               proposed_flag_environment,
               published_at
             ) do
        {:ok,
         %{
           flag_key: flag_key,
           flag: flag,
           flag_environment: updated_flag_environment,
           ruleset: inserted_ruleset
         }}
      end
    end
  rescue
    error in [Ecto.InvalidChangesetError] ->
      {:error, StoreError.invalid_command("manifest import could not be persisted", cause: error)}
  end

  defp insert_environment_version(repo, target_environment, command, _applied_flags) do
    %EnvironmentVersion{}
    |> EnvironmentVersion.changeset(%{
      environment_key: target_environment.key,
      version: next_environment_version(repo, target_environment.key),
      authored_snapshot: command.proposed_target_bundle,
      source_environment_key: command.source_environment_key,
      target_environment_key: command.target_environment_key,
      compare_token: command.compare_token,
      source_fingerprint: command.source_fingerprint,
      target_fingerprint: command.target_fingerprint,
      dependency_closure_keys: command.dependency_closure_keys,
      applied_flag_keys: command.flag_keys,
      tenant_key: command.tenant_key,
      metadata: %{
        "actor" => command.actor || %{},
        "reason" => command.reason,
        "metadata" => command.metadata,
        "tenant" => Command.GovernanceSupport.tenant_provenance(command)
      }
    })
    |> repo.insert()
  end

  defp insert_manifest_import_environment_version(
         repo,
         target_environment,
         command,
         _applied_flags,
         published_at
       ) do
    %EnvironmentVersion{}
    |> EnvironmentVersion.changeset(%{
      environment_key: target_environment.key,
      version: next_environment_version(repo, target_environment.key),
      authored_snapshot: command.proposed_target_bundle,
      source_environment_key: command.source_environment_key,
      target_environment_key: command.target_environment_key,
      compare_token: command.plan_token,
      source_fingerprint: nil,
      target_fingerprint: command.target_fingerprint,
      dependency_closure_keys: command.dependency_closure_keys,
      applied_flag_keys: command.flag_keys,
      tenant_key: command.tenant_key,
      metadata: %{
        "mode" => "manifest_import",
        "actor" => command.actor || %{},
        "reason" => command.reason,
        "metadata" => command.metadata,
        "tenant" => Command.GovernanceSupport.tenant_provenance(command),
        "published_at" => DateTime.to_iso8601(published_at)
      }
    })
    |> repo.insert()
  end

  defp fetch_flag_environment_for_repo(repo, flag_key, environment_key) do
    flag =
      from(flag in Flag,
        where: flag.key == ^to_string(flag_key),
        join: fe in assoc(flag, :flag_environments),
        join: env in assoc(fe, :environment),
        where: env.key == ^to_string(environment_key),
        preload: [flag_environments: {fe, [:environment, :active_ruleset]}]
      )
      |> repo.one()

    case flag do
      nil ->
        {:error, StoreError.flag_not_found(flag_key, environment_key)}

      %{flag_environments: [flag_environment]} ->
        {:ok, flag, flag_environment}
    end
  end

  defp upsert_import_flag(repo, flag_key, proposed_flag) do
    flag_attrs =
      %{
        key: flag_key,
        description: proposed_flag["description"],
        flag_type: normalize_flag_type(proposed_flag["flag_type"]),
        value_type: normalize_value_type(proposed_flag["value_type"]),
        default_value: proposed_flag["default_value"] || %{},
        tags: proposed_flag["tags"] || []
      }

    case flag_by_key_query(flag_key) |> repo.one() do
      nil ->
        %Flag{}
        |> Flag.changeset(flag_attrs)
        |> repo.insert()

      flag ->
        flag
        |> Flag.changeset(flag_attrs)
        |> repo.update()
    end
  end

  defp upsert_import_flag_environment(
         repo,
         flag,
         target_environment,
         proposed_flag_environment,
         published_at
       ) do
    case fetch_existing_flag_environment_for_repo(repo, flag.id, target_environment.id) do
      nil ->
        %FlagEnvironment{}
        |> FlagEnvironment.changeset(%{
          flag_id: flag.id,
          environment_id: target_environment.id,
          status: normalize_flag_environment_status(proposed_flag_environment["status"]),
          kill_switch_variant_key: proposed_flag_environment["kill_switch_variant_key"],
          last_published_at: published_at,
          last_evaluated_at: proposed_flag_environment["last_evaluated_at"],
          variants_served: Map.get(proposed_flag_environment, "variants_served", %{})
        })
        |> repo.insert()

      %FlagEnvironment{} = flag_environment ->
        if flag_environment.status == :archived do
          {:error,
           StoreError.invalid_command("manifest import would revive an archived flag environment",
             metadata: %{flag_key: flag.key}
           )}
        else
          {:ok, flag_environment}
        end
    end
  end

  defp insert_import_ruleset(repo, flag_environment, proposed_ruleset, published_at) do
    %Ruleset{}
    |> Ruleset.changeset(%{
      flag_environment_id: flag_environment.id,
      version: next_ruleset_version_for_repo(repo, flag_environment.id),
      status: :published,
      salt: proposed_ruleset["salt"],
      published_at: published_at,
      metadata: Map.get(proposed_ruleset, "metadata", %{}),
      rules: Map.get(proposed_ruleset, "rules", [])
    })
    |> repo.insert()
  end

  defp finalize_import_flag_environment(
         repo,
         flag_environment,
         inserted_ruleset,
         proposed_flag_environment,
         published_at
       ) do
    flag_environment
    |> FlagEnvironment.changeset(%{
      active_ruleset_id: inserted_ruleset.id,
      status: normalize_flag_environment_status(proposed_flag_environment["status"]),
      kill_switch_variant_key: proposed_flag_environment["kill_switch_variant_key"],
      last_published_at: published_at,
      last_evaluated_at: proposed_flag_environment["last_evaluated_at"],
      variants_served: Map.get(proposed_flag_environment, "variants_served", %{})
    })
    |> repo.update()
  end

  defp fetch_existing_flag_environment_for_repo(repo, flag_id, environment_id) do
    from(fe in FlagEnvironment,
      where: fe.flag_id == ^flag_id and fe.environment_id == ^environment_id
    )
    |> repo.one()
  end

  defp normalize_flag_type(nil), do: :release

  defp normalize_flag_type(value) when is_binary(value) do
    try do
      parsed = String.to_existing_atom(value)
      if parsed in Flag.flag_types(), do: parsed, else: :release
    rescue
      ArgumentError -> :release
    end
  end

  defp normalize_flag_type(value) when is_atom(value) do
    if value in Flag.flag_types(), do: value, else: :release
  end

  defp normalize_flag_type(_value), do: :release

  defp normalize_value_type(nil), do: :boolean

  defp normalize_value_type(value) when is_binary(value) do
    try do
      parsed = String.to_existing_atom(value)
      if parsed in Flag.value_types(), do: parsed, else: :boolean
    rescue
      ArgumentError -> :boolean
    end
  end

  defp normalize_value_type(value) when is_atom(value) do
    if value in Flag.value_types(), do: value, else: :boolean
  end

  defp normalize_value_type(_value), do: :boolean

  defp next_ruleset_version_for_repo(repo, flag_environment_id) do
    from(ruleset in Ruleset,
      where: ruleset.flag_environment_id == ^flag_environment_id,
      select: max(ruleset.version)
    )
    |> repo.one()
    |> Kernel.||(0)
    |> Kernel.+(1)
  end

  defp next_environment_version(repo, environment_key) do
    from(version in EnvironmentVersion,
      where: version.environment_key == ^environment_key,
      select: max(version.version)
    )
    |> repo.one()
    |> Kernel.||(0)
    |> Kernel.+(1)
  end

  defp normalize_flag_environment_status(nil), do: :active

  defp normalize_flag_environment_status(status) when is_binary(status) do
    case String.to_existing_atom(status) do
      normalized ->
        if normalized in FlagEnvironment.statuses(), do: normalized, else: :active
    end
  rescue
    ArgumentError -> :active
  end

  defp normalize_flag_environment_status(status) when is_atom(status) do
    if status in FlagEnvironment.statuses(), do: status, else: :active
  end

  defp normalize_flag_environment_status(_status), do: :active

  defp denormalize_promoted_value(map) when is_map(map) do
    Map.new(map, fn {key, value} -> {key, denormalize_promoted_value(value)} end)
  end

  defp denormalize_promoted_value(list) when is_list(list) do
    Enum.map(list, &denormalize_promoted_value/1)
  end

  defp denormalize_promoted_value("nil"), do: nil
  defp denormalize_promoted_value("true"), do: true
  defp denormalize_promoted_value("false"), do: false
  defp denormalize_promoted_value(value), do: value

  defp insert_runtime_snapshot(repo, environment, published_at) do
    snapshot_payload = build_environment_snapshot_payload(repo, environment)
    payload = :erlang.term_to_binary(snapshot_payload)

    attrs = %{
      environment_key: environment.key,
      version: next_snapshot_version(repo, environment.key),
      payload: payload,
      payload_checksum: payload_checksum(payload),
      metadata: %{
        schema_version: @snapshot_schema_version,
        flag_count: map_size(snapshot_payload.flags)
      },
      published_at: published_at
    }

    %RuntimeSnapshot{}
    |> RuntimeSnapshot.changeset(attrs)
    |> repo.insert()
    |> case do
      {:ok, snapshot} ->
        Telemetry.execute(
          [:rulestead, :runtime, :snapshot, :published],
          %{count: 1},
          Telemetry.metadata(%{
            environment: environment.key,
            snapshot_version: snapshot.version,
            reason: :published
          })
        )

        :ok =
          Notifier.broadcast(
            Config.notifier(),
            %{environment_key: environment.key, snapshot_version: snapshot.version},
            pubsub: Config.pubsub(),
            pubsub_topic: Config.pubsub_topic()
          )

        {:ok, snapshot}

      {:error, %Changeset{} = changeset} ->
        {:error, changeset}
    end
  end

  defp build_environment_snapshot_payload(repo, environment) do
    flags =
      environment_snapshot_flags_query(environment.key)
      |> repo.all()
      |> Map.new(fn flag ->
        [flag_environment] = flag.flag_environments

        {flag.key, build_flag_payload(flag, environment, flag_environment, true)}
      end)

    %{
      schema_version: @snapshot_schema_version,
      environment_key: environment.key,
      generated_at: now(),
      flags: flags,
      audiences: compiled_audience_definitions(repo, environment)
    }
  end

  defp compiled_audience_definitions(repo, _environment) do
    Audience
    |> where([audience], is_nil(audience.archived_at))
    |> order_by([audience], asc: audience.key)
    |> repo.all()
    |> Map.new(fn audience ->
      {audience.key,
       %{
         definition: audience.definition,
         archived_at: audience.archived_at
       }}
    end)
  end

  defp next_snapshot_version(repo, environment_key) do
    from(snapshot in RuntimeSnapshot,
      where: snapshot.environment_key == ^environment_key,
      select: max(snapshot.version)
    )
    |> repo.one()
    |> Kernel.||(0)
    |> Kernel.+(1)
  end

  defp payload_checksum(payload) do
    :sha256
    |> :crypto.hash(payload)
    |> Base.encode16(case: :lower)
  end

  defp build_flag_payload(flag, environment, flag_environment, include_ruleset?) do
    %{
      flag: flag_summary(flag),
      environment: environment_summary(environment),
      flag_environment: flag_environment_summary(flag_environment),
      active_ruleset:
        if(include_ruleset?,
          do: runtime_ruleset_payload(active_ruleset_payload(flag_environment), flag_environment),
          else: nil
        ),
      draft_rulesets:
        if(include_ruleset?,
          do: draft_ruleset_payloads(flag_environment),
          else: []
        )
    }
  end

  defp build_flag_detail_payload(
         flag,
         environment,
         flag_environment,
         include_ruleset?,
         lifecycle_context
       ) do
    build_flag_payload(flag, environment, flag_environment, include_ruleset?)
    |> decorate_payload(flag, environment, flag_environment, lifecycle_context)
  end

  defp build_list_entry(entry, lifecycle_context) do
    entry
    |> entry_to_payload()
    |> decorate_payload(entry.flag, entry.environment, entry.flag_environment, lifecycle_context)
  end

  defp build_archive_payload(flag) do
    environment_keys =
      flag.flag_environments
      |> Enum.map(& &1.environment.key)
      |> Enum.sort()

    %{
      flag: flag_summary(flag),
      archived?: not is_nil(flag.archived_at),
      environment_keys: environment_keys
    }
  end

  defp build_create_payload(flag) do
    %{
      flag: flag_summary(flag),
      archived?: not is_nil(flag.archived_at),
      environment_keys: flag.flag_environments |> Enum.map(& &1.environment.key) |> Enum.sort(),
      environments:
        flag.flag_environments
        |> Enum.map(&environment_summary(&1.environment))
        |> Enum.sort_by(& &1.key),
      recent_owners: recent_owners(flag.ownership.owner_ref)
    }
  end

  defp build_update_payload(flag, previous_owner) do
    {environment, flag_environment} = preferred_environment(flag)
    lifecycle_context = lifecycle_context_for_flags([flag.key])

    build_flag_detail_payload(flag, environment, flag_environment, true, lifecycle_context)
    |> Map.put(:recent_owners, recent_owners(flag.ownership.owner_ref, previous_owner))
  end

  defp active_ruleset_payload(%FlagEnvironment{active_ruleset: nil}), do: nil

  defp active_ruleset_payload(%FlagEnvironment{active_ruleset: ruleset}),
    do: serialize_ruleset(ruleset)

  defp draft_ruleset_payloads(%FlagEnvironment{id: id}) do
    from(r in Ruleset,
      where: r.flag_environment_id == ^id and r.status == :draft,
      order_by: [desc: r.version]
    )
    |> Repo.all()
    |> Enum.map(&serialize_ruleset/1)
  end

  defp flag_summary(flag) do
    Map.take(flag, [
      :id,
      :key,
      :description,
      :flag_type,
      :value_type,
      :default_value,
      :ownership,
      :lifecycle,
      :tags,
      :archived_at,
      :inserted_at,
      :updated_at
    ])
  end

  defp environment_summary(environment) do
    Map.take(environment, [:id, :key, :name, :description, :inserted_at, :updated_at])
  end

  defp audience_summary(audience) do
    Map.take(audience, [
      :id,
      :key,
      :description,
      :definition,
      :archived_at,
      :inserted_at,
      :updated_at
    ])
  end

  defp audience_preview_payload(repo, environment_key, audience, command, opts \\ []) do
    affected_references =
      AudienceDependencies.summarize(
        audience.key,
        audience_reference_payloads(repo, environment_key)
      )

    resolver_opts = Keyword.get(opts, :preview_evidence_resolver_opts, [])

    with {:ok, attrs} <-
           assemble_preview_evidence_attrs(
             environment_key,
             audience,
             command,
             affected_references,
             resolver_opts
           ) do
      {:ok, ImpactPreview.build(attrs)}
    end
  end

  defp assemble_preview_evidence_attrs(
         environment_key,
         audience,
         command,
         affected_references,
         resolver_opts
       ) do
    before_definition = command.before_definition || audience.definition

    after_definition =
      case command.operation do
        "update" -> command.after_definition || audience.definition
        _other -> command.after_definition
      end

    tenant_key = command.tenant_key || Map.get(audience, :tenant_key)
    command_samples = Map.get(command, :samples, [])
    explicit_basis = Map.get(command, :preview_basis)

    base_attrs = %{
      environment_key: environment_key,
      tenant_key: tenant_key,
      audience_key: audience.key,
      operation: command.operation,
      before_definition: before_definition,
      after_definition: after_definition,
      affected_references: affected_references
    }

    query =
      Map.merge(base_attrs, %{
        affected_reference_keys: AudienceDependencies.reference_keys(affected_references)
      })

    case PreviewEvidence.resolver_module(resolver_opts) do
      nil ->
        {:ok,
         Map.merge(base_attrs, %{
           samples: command_samples,
           impression_summary: %{},
           preview_basis: explicit_basis || "authored_state_and_explicit_samples"
         })}

      _resolver ->
        case PreviewEvidence.resolve(query, resolver_opts) do
          {:ok, evidence} ->
            merged_samples =
              Limits.merge_samples(
                command_samples,
                Map.get(evidence, :samples, []),
                resolver_opts
              )

            impression_summary = Map.get(evidence, :impression_summary, %{})

            preview_basis =
              cond do
                explicit_basis ->
                  explicit_basis

                preview_evidence_present?(merged_samples, impression_summary) ->
                  "authored_state_with_host_evidence"

                true ->
                  "authored_state_host_evidence_unavailable"
              end

            {:ok,
             Map.merge(base_attrs, %{
               samples: merged_samples,
               impression_summary: impression_summary,
               preview_basis: preview_basis
             })}

          {:error, %Rulestead.Error{metadata: %{reason: :empty}}} ->
            {:ok,
             Map.merge(base_attrs, %{
               samples: command_samples,
               impression_summary: %{},
               preview_basis: explicit_basis || "authored_state_host_evidence_unavailable"
             })}

          {:error, _} = error ->
            error
        end
    end
  end

  defp preview_evidence_present?(samples, impression_summary) do
    Enum.any?(List.wrap(samples)) or map_size(impression_summary) > 0
  end

  defp audience_reference_payloads(repo, environment_key) do
    environment_snapshot_flags_query(environment_key)
    |> repo.all()
    |> Enum.map(fn flag ->
      [flag_environment] = flag.flag_environments
      environment = flag_environment.environment

      %{
        flag: flag_summary(flag),
        environment_key: environment.key,
        tenant_key: "global",
        active_ruleset: active_ruleset_payload(flag_environment),
        lifecycle: plain_struct_map(flag.lifecycle)
      }
    end)
  end

  defp refresh_audience_reference_projection(environment_key) do
    Repo.transaction(fn repo ->
      environments =
        case normalize_projection_environment_key(environment_key) do
          nil -> projection_environment_keys(repo)
          key -> [key]
        end

      deleted_rows =
        repo.delete_all(
          from(projection in AudienceReferenceProjection,
            where: projection.environment_key in ^environments
          )
        )
        |> elem(0)

      inserted_rows = insert_projection_rows(repo, environments)

      %{deleted_rows: deleted_rows, inserted_rows: inserted_rows}
    end)
    |> case do
      {:ok, result} ->
        {:ok, result}

      {:error, reason} ->
        {:error, StoreError.unavailable(cause: reason)}
    end
  end

  defp projection_environment_keys(repo) do
    from(environment in Environment, select: environment.key)
    |> repo.all()
    |> Enum.sort()
  end

  defp insert_projection_rows(_repo, []), do: 0

  defp insert_projection_rows(repo, environments) do
    environments
    |> Enum.flat_map(fn environment_key ->
      repo
      |> audience_reference_payloads(environment_key)
      |> projection_entries_from_payloads()
    end)
    |> Enum.reduce(0, fn attrs, inserted ->
      changeset = AudienceReferenceProjection.changeset(%AudienceReferenceProjection{}, attrs)

      case repo.insert(changeset) do
        {:ok, _projection} -> inserted + 1
        {:error, _changeset} -> inserted
      end
    end)
  end

  defp projection_entries_from_payloads(payloads) do
    payloads
    |> Enum.flat_map(&payload_projection_entries/1)
    |> Enum.reduce(%{}, fn entry, acc ->
      key = projection_identity_key(entry)
      Map.update(acc, key, entry, &Map.update!(&1, :reference_count, fn count -> count + 1 end))
    end)
    |> Map.values()
    |> DependencyInventory.sort_entries()
    |> Enum.map(&projection_entry_attrs/1)
  end

  defp payload_projection_entries(payload) do
    payload
    |> projection_rules()
    |> Enum.flat_map(fn rule ->
      if projection_rule_strategy(rule) == "segment_match" do
        [
          DependencyInventory.normalize_entry(%{
            environment_key: payload.environment_key,
            tenant_key: payload.tenant_key || "global",
            flag_key: get_in(payload, [:flag, :key]),
            ruleset_version: get_in(payload, [:active_ruleset, :version]),
            rule_key: projection_rule_key(rule),
            audience_key: projection_rule_audience_key(rule),
            ruleset_status: get_in(payload, [:active_ruleset, :status]),
            rollout_context: projection_rollout_context(rule),
            lifecycle_context: payload.lifecycle,
            visibility: %{status: "visible"},
            reference_count: 1,
            hidden_reference_count: 0
          })
        ]
      else
        []
      end
    end)
    |> Enum.reject(& &1.malformed?)
  end

  defp projection_rules(%{active_ruleset: %{rules: rules}}) when is_list(rules), do: rules
  defp projection_rules(_payload), do: []

  defp projection_rule_strategy(rule) do
    rule
    |> Map.get(:strategy, Map.get(rule, "strategy"))
    |> to_string()
  end

  defp projection_rule_key(rule) do
    rule
    |> Map.get(:key, Map.get(rule, "key"))
    |> to_string()
  end

  defp projection_rule_audience_key(rule) do
    case Map.get(rule, :audience_key, Map.get(rule, "audience_key")) do
      nil -> nil
      value -> to_string(value)
    end
  end

  defp projection_rollout_context(rule) do
    case Map.get(rule, :rollout, Map.get(rule, "rollout")) do
      rollout when is_map(rollout) -> plain_struct_map(rollout)
      _other -> %{}
    end
  end

  defp projection_identity_key(entry) do
    {
      entry.environment_key,
      entry.tenant_key,
      entry.flag_key,
      entry.ruleset_version,
      entry.rule_key,
      entry.audience_key
    }
  end

  defp projection_entry_attrs(entry) do
    %{
      environment_key: entry.environment_key,
      tenant_key: entry.tenant_key,
      audience_key: entry.audience_key,
      flag_key: entry.flag_key,
      ruleset_version: entry.ruleset_version,
      rule_key: entry.rule_key,
      rule_strategy: "segment_match",
      ruleset_status: entry.ruleset_status,
      rollout_context: entry.rollout_context,
      lifecycle_context: entry.lifecycle_context,
      visibility: entry.visibility,
      reference_count: entry.reference_count,
      hidden_reference_count: entry.hidden_reference_count
    }
  end

  defp projection_row_to_entry(%AudienceReferenceProjection{} = projection) do
    DependencyInventory.normalize_entry(%{
      environment_key: projection.environment_key,
      tenant_key: projection.tenant_key,
      audience_key: projection.audience_key,
      flag_key: projection.flag_key,
      ruleset_version: projection.ruleset_version,
      rule_key: projection.rule_key,
      ruleset_status: projection.ruleset_status,
      rollout_context: projection.rollout_context,
      lifecycle_context: projection.lifecycle_context,
      visibility: projection.visibility,
      reference_count: projection.reference_count,
      hidden_reference_count: projection.hidden_reference_count
    })
  end

  defp publish_dependency_entries(environment_key, command, flag, ruleset) do
    tenant_key = publish_scope_tenant(command)
    flag_key = to_string(flag.key)

    ruleset_dependency_entries(
      environment_key,
      tenant_key,
      flag_key,
      ruleset
    )
  end

  defp ruleset_dependency_entries(environment_key, tenant_key, flag_key, ruleset) do
    rules = Map.get(ruleset, :rules) || Map.get(ruleset, "rules") || []
    ruleset_version = Map.get(ruleset, :version) || Map.get(ruleset, "version")
    ruleset_status = Map.get(ruleset, :status) || Map.get(ruleset, "status")

    rules
    |> Enum.flat_map(fn rule ->
      if projection_rule_strategy(rule) == "segment_match" do
        [
          DependencyInventory.normalize_entry(%{
            environment_key: environment_key,
            tenant_key: tenant_key || "global",
            audience_key: projection_rule_audience_key(rule),
            flag_key: flag_key,
            ruleset_version: ruleset_version,
            rule_key: projection_rule_key(rule),
            ruleset_status: ruleset_status,
            rollout_context: projection_rollout_context(rule),
            lifecycle_context: %{available?: false},
            visibility: %{status: "visible"},
            reference_count: 1,
            hidden_reference_count: 0,
            audience_schema_version: rule_audience_schema_version(rule),
            audience_version_hash: rule_audience_version_hash(rule)
          })
        ]
      else
        []
      end
    end)
    |> Enum.reject(&(&1.malformed? or is_nil(&1.audience_key)))
    |> DependencyInventory.sort_entries()
  end

  defp validate_dependency_entries(command, dependency_entries, opts \\ []) do
    findings =
      DependencyValidator.validate(
        %{
          tenant_key: publish_scope_tenant(command),
          audiences: dependency_validation_audiences(),
          expected_reference_keys: Keyword.get(opts, :expected_reference_keys),
          stale_reference_keys: Keyword.get(opts, :stale_reference_keys)
        },
        dependency_entries
      )

    DependencyValidator.sort_findings(findings)
  end

  defp dependency_validation_audiences do
    from(audience in Audience)
    |> Repo.all()
    |> Map.new(fn audience ->
      {audience.key,
       %{
         key: audience.key,
         tenant_key: Map.get(audience, :tenant_key),
         archived_at: audience.archived_at,
         definition: audience.definition
       }}
    end)
  end

  defp publish_scope_tenant(command) do
    command
    |> Command.GovernanceSupport.tenant_provenance()
    |> Map.get("tenant_key")
  end

  defp rule_audience_schema_version(rule) do
    metadata = rule_metadata(rule)

    Map.get(rule, :audience_schema_version) ||
      Map.get(rule, "audience_schema_version") ||
      Map.get(metadata, :audience_schema_version) ||
      Map.get(metadata, "audience_schema_version")
  end

  defp rule_audience_version_hash(rule) do
    metadata = rule_metadata(rule)

    Map.get(rule, :audience_version_hash) ||
      Map.get(rule, "audience_version_hash") ||
      Map.get(metadata, :audience_version_hash) ||
      Map.get(metadata, "audience_version_hash")
  end

  defp rule_metadata(%_{} = rule), do: rule |> Map.from_struct() |> rule_metadata()

  defp rule_metadata(rule) when is_map(rule) do
    Map.get(rule, :metadata) || Map.get(rule, "metadata") || %{}
  end

  defp rule_metadata(_rule), do: %{}

  defp insert_blocked_publish_event(command, ruleset, findings) do
    metadata = %{
      "version" => Map.get(ruleset, :version),
      "blockers" => dependency_blockers(findings),
      "dependency_findings" => serialize_dependency_findings(findings)
    }

    %AuditEvent{}
    |> audit_event_changeset(command, "ruleset.publish_blocked", :error, %{metadata: metadata})
    |> Repo.insert()
  end

  defp dependency_blockers(findings) when is_list(findings) do
    Enum.map(findings, fn finding ->
      %{
        "code" => to_string(Map.get(finding, :code)),
        "environment_key" => Map.get(finding, :environment_key),
        "tenant_key" => Map.get(finding, :tenant_key),
        "flag_key" => Map.get(finding, :flag_key),
        "ruleset_version" => Map.get(finding, :ruleset_version),
        "rule_key" => Map.get(finding, :rule_key),
        "audience_key" => Map.get(finding, :audience_key)
      }
    end)
  end

  defp dependency_blockers(_findings), do: []

  defp serialize_dependency_findings(findings) when is_list(findings) do
    Enum.map(findings, fn finding ->
      %{
        "code" => to_string(Map.get(finding, :code)),
        "severity" => to_string(Map.get(finding, :severity)),
        "message" => Map.get(finding, :message),
        "environment_key" => Map.get(finding, :environment_key),
        "tenant_key" => Map.get(finding, :tenant_key),
        "audience_key" => Map.get(finding, :audience_key),
        "flag_key" => Map.get(finding, :flag_key),
        "ruleset_version" => Map.get(finding, :ruleset_version),
        "rule_key" => Map.get(finding, :rule_key)
      }
    end)
  end

  defp serialize_dependency_findings(_findings), do: []

  defp maybe_filter_projection_scope(query, command) do
    query
    |> maybe_filter_projection_environment(command)
    |> maybe_filter_projection_tenant(command)
    |> maybe_filter_projection_audience(command)
  end

  defp maybe_filter_projection_environment(query, command) do
    case normalize_projection_environment_key(Map.get(command, :environment_key)) do
      nil ->
        query

      environment_key ->
        where(query, [projection], projection.environment_key == ^environment_key)
    end
  end

  defp maybe_filter_projection_tenant(query, command) do
    case normalize_projection_tenant_key(Map.get(command, :tenant_key)) do
      nil -> query
      tenant_key -> where(query, [projection], projection.tenant_key == ^tenant_key)
    end
  end

  defp maybe_filter_projection_audience(query, command) do
    case normalize_projection_audience_key(Map.get(command, :audience_key)) do
      nil -> query
      audience_key -> where(query, [projection], projection.audience_key == ^audience_key)
    end
  end

  defp command_limit(command) do
    case Map.get(command, :limit) do
      value when is_integer(value) and value > 0 -> value
      _other -> 50
    end
  end

  defp command_offset(command) do
    case Map.get(command, :offset) do
      value when is_integer(value) and value >= 0 -> value
      _other -> 0
    end
  end

  defp normalize_projection_environment_key(nil), do: nil
  defp normalize_projection_environment_key(environment_key), do: to_string(environment_key)

  defp normalize_projection_tenant_key(nil), do: nil
  defp normalize_projection_tenant_key(tenant_key), do: to_string(tenant_key)

  defp normalize_projection_audience_key(nil), do: nil
  defp normalize_projection_audience_key(audience_key), do: to_string(audience_key)

  defp list_audience_dependencies_command(%Command.ListAudienceDependencies{} = command),
    do: command

  defp list_audience_dependencies_command(command) do
    Command.ListAudienceDependencies.new(
      environment_key: Map.get(command, :environment_key) || Map.get(command, "environment_key"),
      tenant_key: Map.get(command, :tenant_key) || Map.get(command, "tenant_key"),
      audience_key: Map.get(command, :audience_key) || Map.get(command, "audience_key"),
      limit: Map.get(command, :limit) || Map.get(command, "limit"),
      offset: Map.get(command, :offset) || Map.get(command, "offset"),
      actor: Map.get(command, :actor) || Map.get(command, "actor"),
      include_redacted_placeholders?:
        Map.get(command, :include_redacted_placeholders?) ||
          Map.get(command, "include_redacted_placeholders?"),
      visible_audience_keys:
        Map.get(command, :visible_audience_keys) || Map.get(command, "visible_audience_keys")
    )
  end

  defp redaction_options(command) do
    base = [
      visible_audience_keys: Map.get(command, :visible_audience_keys),
      include_redacted_placeholders?: Map.get(command, :include_redacted_placeholders?, false)
    ]

    case Map.get(command, :actor) do
      actor when is_map(actor) ->
        Keyword.put(
          base,
          :visibility_resolver,
          Rulestead.Admin.DependencyVisibility.visibility_resolver(actor)
        )

      _ ->
        base
    end
  end

  defp plain_struct_map(%_{} = value), do: value |> Map.from_struct() |> plain_struct_map()

  defp plain_struct_map(value) when is_map(value) do
    Map.new(value, fn {key, nested} -> {key, plain_struct_map(nested)} end)
  end

  defp plain_struct_map(value) when is_list(value), do: Enum.map(value, &plain_struct_map/1)
  defp plain_struct_map(value), do: value

  defp fetch_audience_for_mutation(audience_key) do
    case Repo.get_by(Audience, key: to_string(audience_key)) do
      nil -> {:error, StoreError.invalid_command("audience was not found")}
      audience -> {:ok, audience}
    end
  end

  defp ensure_audience_active(%Audience{archived_at: nil}), do: :ok

  defp ensure_audience_active(%Audience{}),
    do: {:error, StoreError.invalid_command("audience is archived")}

  defp ensure_supported_audience_operation(%Command.ApplyAudienceMutation{
         operation: "delete_attempt"
       }) do
    {:error, StoreError.invalid_command("audience_delete_unsupported")}
  end

  defp ensure_supported_audience_operation(%Command.ApplyAudienceMutation{operation: operation})
       when operation in ["update", "archive"],
       do: :ok

  defp ensure_supported_audience_operation(%Command.ApplyAudienceMutation{} = command),
    do:
      {:error, StoreError.invalid_command("unsupported audience operation: #{command.operation}")}

  defp ensure_audience_preview_schema(%Command.ApplyAudienceMutation{
         preview_schema_version: version
       }) do
    if version == ImpactPreview.schema_version() do
      :ok
    else
      {:error, StoreError.invalid_command("audience preview schema version is incompatible")}
    end
  end

  defp ensure_fresh_audience_preview(command, current_preview) do
    if command.preview_fingerprint == current_preview.preview_fingerprint do
      :ok
    else
      {:error,
       StoreError.invalid_command(
         "audience preview is stale",
         metadata: %{
           audience_key: command.audience_key,
           expected_preview_fingerprint: current_preview.preview_fingerprint,
           preview_fingerprint: command.preview_fingerprint
         }
       )}
    end
  end

  defp validate_blast_radius_threshold(command, preview, opts) do
    dependency_entries = audience_dependency_entries(preview, command)

    BlastRadiusThreshold.validate_protected_apply(
      command,
      preview,
      Keyword.merge(
        [dependency_entries: dependency_entries, audiences: dependency_validation_audiences()],
        opts
      )
    )
  end

  defp apply_audience_operation(
         repo,
         %Audience{} = audience,
         %Command.ApplyAudienceMutation{operation: "update"} = command,
         published_at
       ) do
    audience
    |> Audience.changeset(%{
      definition: command.after_definition || audience.definition,
      description: command.metadata["description"] || audience.description
    })
    |> Changeset.change(updated_at: published_at)
    |> repo.update()
  end

  defp apply_audience_operation(
         repo,
         %Audience{} = audience,
         %Command.ApplyAudienceMutation{operation: "archive"},
         published_at
       ) do
    audience
    |> Audience.changeset(%{archived_at: published_at})
    |> Changeset.change(updated_at: published_at)
    |> repo.update()
  end

  defp audience_event_type("archive"), do: "audience.archived"
  defp audience_event_type("delete_attempt"), do: "audience.delete_blocked"
  defp audience_event_type(_operation), do: "audience.updated"

  defp audience_audit_state(%Audience{} = audience) do
    %{
      "key" => audience.key,
      "definition" => audience.definition,
      "archived_at" => audience.archived_at && DateTime.to_iso8601(audience.archived_at)
    }
  end

  defp audience_audit_event_changeset(audit_event, command, event_type, result, opts) do
    preview = Map.get(opts, :preview) || %{}
    evidence_summary = ImpactPreview.audit_evidence_summary(preview)

    metadata =
      AuditEvent.metadata(%{
        before: Map.get(opts, :before, %{}),
        after: Map.get(opts, :after, %{}),
        tenant: Command.GovernanceSupport.tenant_provenance(command),
        context: Map.get(command, :metadata, %{}),
        request_id: correlation_id(command),
        preview_fingerprint: command.preview_fingerprint,
        preview_schema_version: command.preview_schema_version,
        affected_reference_keys: audience_audit_reference_keys(command, opts),
        preview_basis: command.preview_basis,
        blockers: Map.get(opts, :blockers),
        dependency_findings: Map.get(opts, :dependency_findings, []),
        affected_references: Map.get(preview, :affected_references),
        uncertainty: Map.get(preview, :uncertainty),
        sample_evidence: Map.get(preview, :sample_evidence),
        impression_evidence: Map.get(preview, :impression_evidence)
      })
      |> Map.merge(evidence_summary)
      |> Map.merge(Map.new(Map.get(opts, :metadata, %{})))

    AuditEvent.changeset(audit_event, %{
      event_type: event_type,
      resource_type: "audience",
      resource_key: command.audience_key,
      environment_key: Map.get(opts, :environment_key, command.environment_key),
      actor_id: actor_value(command.actor, "id"),
      actor_type: to_string(actor_value(command.actor, "type") || "operator"),
      actor_display: actor_value(command.actor, "display"),
      reason: command.reason,
      result: result,
      metadata: metadata,
      correlation_id: correlation_id(command),
      occurred_at: now()
    })
  end

  defp insert_audience_audit_only_event(command, event_type, result) do
    %AuditEvent{}
    |> audience_audit_event_changeset(command, event_type, result, %{
      blockers: audience_blockers(command, result)
    })
    |> Repo.insert()
    |> case do
      {:ok, audit_event} ->
        {:ok, %{audit_event: AuditEvent.serialize(audit_event)}}

      {:error, %Changeset{} = changeset} ->
        {:error,
         StoreError.invalid_command("audience audit could not be persisted", cause: changeset)}
    end
  end

  defp insert_blocked_audience_event(command, %Rulestead.Error{} = error, opts) do
    event_type =
      if command.operation == "delete_attempt" do
        "audience.delete_blocked"
      else
        "audience.mutation_blocked"
      end

    preview =
      case Keyword.get(opts, :preview) do
        preview when is_map(preview) and map_size(preview) > 0 -> preview
        _ -> preview_for_blocked_audience_audit(command, error)
      end

    %AuditEvent{}
    |> audience_audit_event_changeset(command, event_type, :error, %{
      blockers: audience_blockers(command, :error, error),
      preview: preview,
      metadata: blocked_audience_metadata(error)
    })
    |> Repo.insert()
  end

  defp preview_for_blocked_audience_audit(command, %Rulestead.Error{metadata: metadata}) do
    verdict = Map.get(metadata, :verdict) || Map.get(metadata, "verdict")

    if verdict in ["above_threshold", "indeterminate"] do
      with {:ok, environment} <- fetch_environment(command.environment_key),
           {:ok, audience} <- fetch_audience_for_mutation(command.audience_key),
           {:ok, preview} <- audience_preview_payload(Repo, environment.key, audience, command) do
        preview
      else
        _ -> %{}
      end
    else
      %{}
    end
  end

  defp preview_for_blocked_audience_audit(_command, _error), do: %{}

  defp blocked_audience_metadata(%Rulestead.Error{metadata: %{verdict: verdict}} = error)
       when verdict in ["above_threshold", "indeterminate"] do
    %{
      "blast_radius_verdict" => verdict,
      "blast_radius_reference_count" => Map.get(error.metadata, :reference_count),
      "blast_radius_breach_reasons" => blast_radius_breach_reasons(error),
      "dependency_findings" => serialize_dependency_findings(error.details)
    }
  end

  defp blocked_audience_metadata(error) do
    %{"dependency_findings" => serialize_dependency_findings(error.details)}
  end

  defp blast_radius_breach_reasons(%Rulestead.Error{cause: %{breach_reasons: reasons}})
       when is_list(reasons) do
    Enum.map(reasons, fn reason ->
      %{
        "code" => Map.get(reason, :code),
        "observed" => Map.get(reason, :observed),
        "limit" => Map.get(reason, :limit),
        "remediation" => Map.get(reason, :remediation)
      }
    end)
  end

  defp blast_radius_breach_reasons(_error), do: []

  defp audience_audit_reference_keys(command, opts) do
    case Map.get(opts, :preview) do
      preview when is_map(preview) and map_size(preview) > 0 ->
        case preview_reference_keys(preview) do
          [] -> command.affected_reference_keys || []
          keys -> keys
        end

      _ ->
        command.affected_reference_keys || []
    end
  end

  defp preview_reference_keys(%{affected_references: affected_references}) do
    AudienceDependencies.reference_keys(affected_references)
  end

  defp preview_reference_keys(_preview), do: []

  defp audience_dependency_entries(preview, command) do
    preview
    |> Map.get(:affected_references, [])
    |> Enum.map(fn reference ->
      DependencyInventory.normalize_entry(%{
        environment_key: reference_value(reference, :environment_key) || command.environment_key,
        tenant_key: command.tenant_key || reference_value(reference, :tenant_key) || "global",
        audience_key: command.audience_key,
        flag_key: reference_value(reference, :flag_key),
        ruleset_version: reference_value(reference, :ruleset_version),
        rule_key: reference_value(reference, :rule_key),
        ruleset_status: reference_value(reference, :ruleset_status),
        rollout_context: reference_value(reference, :rollout_context),
        lifecycle_context: reference_value(reference, :lifecycle_context),
        visibility: %{status: "visible"},
        reference_count: 1,
        hidden_reference_count: 0
      })
    end)
    |> Enum.reject(&(&1.malformed? or is_nil(&1.audience_key)))
    |> DependencyInventory.sort_entries()
  end

  defp reference_value(reference, key) when is_map(reference) do
    Map.get(reference, key) || Map.get(reference, Atom.to_string(key))
  end

  defp reference_value(_reference, _key), do: nil

  defp audience_blockers(command, :denied),
    do: [%{"code" => "audience_mutation_denied", "operation" => command.operation}]

  defp audience_blockers(command, _result),
    do: [%{"code" => "audience_mutation_#{command.operation}_blocked"}]

  defp audience_blockers(_command, :error, %Rulestead.Error{details: details, message: message})
       when is_list(details) do
    dependency_codes =
      details
      |> Enum.map(fn detail -> Map.get(detail, :code) || Map.get(detail, "code") end)
      |> Enum.reject(&is_nil/1)

    if dependency_codes == [] do
      [%{"code" => audience_blocker_code(message)}]
    else
      serialize_dependency_findings(details)
    end
  end

  defp audience_blockers(_command, :error, %Rulestead.Error{message: message}) do
    [%{"code" => audience_blocker_code(message)}]
  end

  defp audience_blocker_code(message) when is_binary(message) do
    cond do
      String.contains?(message, "stale") ->
        "audience_preview_stale"

      String.contains?(message, "not found") ->
        "audience_missing"

      String.contains?(message, "archived") ->
        "audience_archived"

      String.contains?(message, "direct-apply limit for protected environments") ->
        "blast_radius_above_threshold"

      String.contains?(message, "Blast radius cannot be evaluated safely") ->
        "blast_radius_indeterminate"

      String.contains?(message, "audience_delete_unsupported") ->
        "audience_delete_unsupported"

      String.contains?(message, "schema") ->
        "audience_preview_schema_incompatible"

      true ->
        "audience_mutation_blocked"
    end
  end

  defp flag_environment_summary(flag_environment) do
    %{
      id: flag_environment.id,
      environment_key: flag_environment.environment.key,
      status: flag_environment.status,
      kill_switch_variant_key: flag_environment.kill_switch_variant_key,
      active_ruleset_version:
        if(flag_environment.active_ruleset, do: flag_environment.active_ruleset.version),
      last_published_at: flag_environment.last_published_at,
      last_evaluated_at: flag_environment.last_evaluated_at,
      inserted_at: flag_environment.inserted_at,
      updated_at: flag_environment.updated_at
    }
  end

  defp entry_to_payload(entry) do
    %{
      flag: flag_summary(entry.flag),
      environment: environment_summary(entry.environment),
      flag_environment: flag_environment_summary(entry.flag_environment),
      active_ruleset: active_ruleset_payload(entry.flag_environment),
      draft_rulesets: draft_ruleset_payloads(entry.flag_environment)
    }
  end

  defp serialize_ruleset(ruleset) do
    %{
      id: ruleset.id,
      flag_environment_id: ruleset.flag_environment_id,
      version: ruleset.version,
      status: ruleset.status,
      salt: ruleset.salt,
      published_at: ruleset.published_at,
      metadata: ruleset.metadata,
      rules: Enum.map(ruleset.rules, &serialize_rule/1),
      inserted_at: ruleset.inserted_at,
      updated_at: ruleset.updated_at
    }
  end

  defp serialize_rule(rule) when is_map(rule) do
    %{
      key: rule.key,
      name: rule.name,
      description: rule.description,
      strategy: rule.strategy,
      value: rule.value,
      audience_id: rule.audience_id,
      audience_key: rule.audience_key,
      conditions: Enum.map(rule.conditions || [], &serialize_condition/1),
      variants: Enum.map(rule.variants || [], &serialize_variant/1),
      rollout: serialize_rollout(rule.rollout)
    }
  end

  defp serialize_condition(condition) when is_map(condition) do
    %{
      attribute: condition.attribute,
      operator: condition.operator,
      value: condition.value
    }
  end

  defp serialize_variant(variant) when is_map(variant) do
    %{
      key: variant.key,
      value: variant.value,
      weight: variant.weight
    }
  end

  defp serialize_rollout(nil), do: nil

  defp serialize_rollout(rollout) when is_map(rollout) do
    %{
      bucket_by: rollout.bucket_by,
      percentage: rollout.percentage,
      salt: rollout.salt,
      guardrails: Enum.map(rollout.guardrails || [], &serialize_guardrail/1)
    }
  end

  defp serialize_guardrail(guardrail) when is_map(guardrail) do
    %{
      signal_key: guardrail.signal_key,
      threshold_operator: guardrail.threshold_operator,
      threshold_value: guardrail.threshold_value,
      freshness_window_seconds: guardrail.freshness_window_seconds,
      min_sample_size: guardrail.min_sample_size,
      environment_scope: guardrail.environment_scope,
      tenant_scope: guardrail.tenant_scope
    }
  end

  defp serialize_runtime_snapshot(snapshot) do
    %{
      id: snapshot.id,
      environment_key: snapshot.environment_key,
      version: snapshot.version,
      payload: snapshot.payload,
      payload_checksum: snapshot.payload_checksum,
      metadata: normalize_metadata(snapshot.metadata),
      published_at: snapshot.published_at,
      inserted_at: snapshot.inserted_at,
      updated_at: snapshot.updated_at
    }
  end

  defp normalize_metadata(metadata) when is_map(metadata) do
    %{
      schema_version: metadata[:schema_version] || metadata["schema_version"],
      flag_count: metadata[:flag_count] || metadata["flag_count"]
    }
  end

  defp normalize_metadata(_metadata), do: %{}

  defp list_environment_filter(nil), do: {:ok, nil}

  defp list_environment_filter(environment_key) do
    case Repo.get_by(Environment, key: to_string(environment_key)) do
      nil -> {:error, StoreError.environment_not_found(environment_key)}
      environment -> {:ok, environment.key}
    end
  end

  defp maybe_filter_environment(query, nil), do: query

  defp maybe_filter_environment(query, environment_key),
    do: where(query, [_, _, env], env.key == ^environment_key)

  defp maybe_filter_archived(query, true), do: query

  defp maybe_filter_archived(query, false),
    do: where(query, [flag, _, _], is_nil(flag.archived_at))

  defp maybe_filter_query(entries, nil), do: entries
  defp maybe_filter_query(entries, ""), do: entries

  defp maybe_filter_query(entries, search) do
    terms = query_terms(search)

    if terms == [] do
      entries
    else
      Enum.filter(entries, &entry_matches_query?(&1, terms))
    end
  end

  defp entry_matches_query?(entry, terms) do
    Enum.all?(terms, fn term ->
      entry_matches_query_term?(entry, term)
    end)
  end

  defp entry_matches_query_term?(entry, {:key, term}) do
    entry.flag.key
    |> normalized_search_value()
    |> String.contains?(term)
  end

  defp entry_matches_query_term?(entry, {:owner, term}) do
    ownership = entry.flag.ownership || %{}

    [ownership.owner_ref, ownership.owner_display]
    |> Enum.map(&normalized_search_value/1)
    |> Enum.any?(&String.contains?(&1, term))
  end

  defp entry_matches_query_term?(entry, {:tag, term}) do
    entry.flag.tags
    |> List.wrap()
    |> Enum.map(&normalized_search_value/1)
    |> Enum.any?(&String.contains?(&1, term))
  end

  defp entry_matches_query_term?(entry, {:any, term}) do
    entry
    |> searchable_entry_values()
    |> Enum.any?(&String.contains?(&1, term))
  end

  defp searchable_entry_values(entry) do
    ownership = entry.flag.ownership || %{}

    ([entry.flag.key, entry.flag.description, ownership.owner_ref, ownership.owner_display] ++
       (entry.flag.tags || []))
    |> Enum.map(&normalized_search_value/1)
  end

  defp normalized_search_value(value), do: value |> to_string() |> String.downcase()

  defp query_term(term) do
    case String.split(term, ":", parts: 2) do
      ["key", value] when value != "" ->
        {:key, value}

      ["owner", value] when value != "" ->
        {:owner, value}

      ["tag", value] when value != "" ->
        {:tag, value}

      _other ->
        {:any, term}
    end
  end

  defp query_terms(search) do
    search
    |> to_string()
    |> String.downcase()
    |> String.split([",", " "])
    |> Enum.map(&String.trim/1)
    |> Enum.reject(&(&1 == ""))
    |> Enum.uniq()
    |> Enum.map(&query_term/1)
  end

  defp maybe_filter_change_request(query, _field, nil), do: query

  defp maybe_filter_change_request(query, field_name, value) do
    where(query, [cr], field(cr, ^field_name) == ^value)
  end

  defp normalize_change_request_filter(nil), do: nil
  defp normalize_change_request_filter(value) when is_atom(value), do: Atom.to_string(value)
  defp normalize_change_request_filter(value) when is_binary(value), do: String.trim(value)
  defp normalize_change_request_filter(_value), do: nil

  defp ruleset_error(changeset, flag_key, environment_key) do
    details = collect_changeset_details(changeset)

    opts = [
      metadata: %{flag_key: to_string(flag_key), environment_key: to_string(environment_key)},
      details: details,
      cause: changeset
    ]

    if Enum.any?(details, &(&1[:message] == "weights must sum to 100")) do
      RulesetError.invalid_variant_weights(opts)
    else
      RulesetError.invalid(opts)
    end
  end

  defp store_changeset_error(changeset, flag_key, environment_key) do
    StoreError.invalid_command(
      "store command is invalid",
      metadata: %{flag_key: to_string(flag_key), environment_key: to_string(environment_key)},
      details: collect_changeset_details(changeset),
      cause: changeset
    )
  end

  defp auto_advance_policy_attrs(%Command.UpsertRolloutAutoAdvancePolicy{} = command) do
    %{
      flag_key: command.flag_key,
      environment_key: command.environment_key,
      rule_key: command.rule_key,
      enabled: command.enabled,
      observation_window_seconds: command.observation_window_seconds,
      next_stage: command.next_stage,
      next_percentage: command.next_percentage,
      metadata: command.metadata
    }
  end

  defp serialize_rollout_auto_advance_policy(%RolloutAutoAdvancePolicy{} = policy) do
    %{
      id: policy.id,
      flag_key: policy.flag_key,
      environment_key: policy.environment_key,
      rule_key: policy.rule_key,
      enabled: policy.enabled,
      observation_window_seconds: policy.observation_window_seconds,
      next_stage: policy.next_stage,
      next_percentage: policy.next_percentage,
      metadata: policy.metadata,
      inserted_at: policy.inserted_at,
      updated_at: policy.updated_at
    }
  end

  defp auto_advance_policy_changeset_error(changeset, command) do
    StoreError.invalid_command("rollout auto-advance policy is invalid",
      metadata: %{
        flag_key: command.flag_key,
        environment_key: command.environment_key,
        rule_key: command.rule_key
      },
      details: collect_changeset_details(changeset),
      cause: changeset
    )
  end

  defp auto_advance_policy_field_errors(errors) do
    Enum.map(errors, fn {field, message} ->
      %{field: to_string(field), message: message}
    end)
  end

  defp rollout_auto_advance_policy_not_found_error(command) do
    StoreError.invalid_command("rollout_auto_advance_policy_not_found",
      metadata: %{
        flag_key: command.flag_key,
        environment_key: command.environment_key,
        rule_key: command.rule_key
      }
    )
  end

  defp maybe_schedule_auto_advance_tick(%Command.AdvanceRollout{} = command, _result) do
    fetch_command =
      Command.FetchRolloutAutoAdvancePolicy.new(
        command.flag_key,
        command.environment_key,
        command.rule_key
      )

    try do
      with {:ok, %{policy: policy}} <- fetch_rollout_auto_advance_policy(fetch_command),
           true <- Schedule.schedulable?(command, policy) do
        idempotency_key = Schedule.idempotency_key(command)

        :ok =
          cancel_superseded_auto_advance_ticks!(
            command.flag_key,
            command.environment_key,
            command.rule_key,
            Schedule.scheduler_actor()
          )

        schedule_command = Schedule.schedule_command(command, policy)

        case schedule_governed_action(schedule_command) do
          {:ok, _payload} ->
            :ok

          {:error, error} ->
            emit_auto_advance_schedule_failure(command, idempotency_key, error)
            :ok
        end
      else
        _ -> :ok
      end
    rescue
      error ->
        emit_auto_advance_schedule_failure_rescue(command, error)
        :ok
    end

    :ok
  end

  defp cancel_superseded_auto_advance_ticks!(flag_key, environment_key, rule_key, actor) do
    flag_key = to_string(flag_key)
    environment_key = to_string(environment_key)
    rule_key = to_string(rule_key)

    pending_auto_advance_scheduled_executions(flag_key, environment_key, rule_key)
    |> Enum.each(fn scheduled_execution ->
      _ =
        cancel_scheduled_execution(%Command.CancelScheduledExecution{
          scheduled_execution_id: scheduled_execution.id,
          actor: actor,
          reason: "superseded by new auto-advance stage advance",
          metadata: %{"source" => "guardrail_automation"}
        })
    end)

    :ok
  end

  defp pending_auto_advance_scheduled_executions(flag_key, environment_key, rule_key) do
    from(se in "scheduled_executions",
      where:
        field(se, :resource_key) == ^flag_key and
          field(se, :environment_key) == ^environment_key and
          field(se, :state) == "scheduled" and
          field(se, :governed_action) == "advance_rollout" and
          fragment("?->>'source' = ?", field(se, :metadata), "guardrail_automation"),
      select: map(se, ^scheduled_execution_fields())
    )
    |> Repo.all()
    |> Enum.map(&normalize_scheduled_execution_row/1)
    |> Enum.filter(fn scheduled_execution ->
      rollout = scheduled_execution.command_snapshot["rollout"] || %{}

      to_string(rollout["rule_key"] || "") == rule_key
    end)
  end

  defp emit_auto_advance_schedule_failure(command, idempotency_key, error) do
    Telemetry.execute(
      Telemetry.scheduled_execution_event(:failed),
      %{count: 1},
      Telemetry.metadata(%{
        flag_key: command.flag_key,
        environment_key: command.environment_key,
        rule_key: command.rule_key,
        idempotency_key: idempotency_key,
        source: "guardrail_automation",
        phase: "schedule_auto_advance_tick",
        error_type: auto_advance_schedule_error_type(error)
      })
    )
  end

  defp emit_auto_advance_schedule_failure_rescue(command, error) do
    idempotency_key =
      if match?(%DateTime{}, command.monitoring_window_ends_at) do
        Schedule.idempotency_key(command)
      else
        nil
      end

    normalized_error =
      if is_struct(error, Rulestead.Error) do
        error
      else
        StoreError.unavailable(cause: error)
      end

    emit_auto_advance_schedule_failure(command, idempotency_key, normalized_error)
  end

  defp auto_advance_schedule_error_type(%Rulestead.Error{type: type}), do: type
  defp auto_advance_schedule_error_type(error) when is_exception(error), do: error.__struct__
  defp auto_advance_schedule_error_type(_error), do: :unknown

  defp collect_changeset_details(%Changeset{} = changeset, path \\ nil) do
    own_details =
      Enum.map(changeset.errors, fn {field, {message, _opts}} ->
        detail = %{field: path_field(path, field), message: message}
        if path, do: Map.put(detail, :path, path), else: detail
      end)

    nested_details =
      Enum.flat_map(changeset.changes, fn
        {field, %Changeset{} = nested_changeset} ->
          collect_changeset_details(nested_changeset, path_field(path, field))

        {field, changesets} when is_list(changesets) ->
          changesets
          |> Enum.with_index()
          |> Enum.flat_map(fn
            {%Changeset{} = nested_changeset, index} ->
              collect_changeset_details(nested_changeset, path_field(path, "#{field}[#{index}]"))

            _ ->
              []
          end)

        _ ->
          []
      end)

    own_details ++ nested_details
  end

  defp path_field(nil, field), do: to_string(field)
  defp path_field(path, field), do: "#{path}.#{field}"

  defp rollback_audit_event(command, %AuditEvent{event_type: event_type} = audit_event)
       when event_type in ["kill_switch.engage", "kill_switch.release"] do
    inverse_operation =
      if event_type == "kill_switch.engage",
        do: :release_kill_switch,
        else: :engage_kill_switch

    inverse_status =
      if event_type == "kill_switch.engage",
        do: %{status: :active, kill_switch_variant_key: nil},
        else: %{status: :killswitched, kill_switch_variant_key: "default"}

    with {:ok, environment} <- fetch_environment(audit_event.environment_key),
         {:ok, flag, flag_environment} <-
           fetch_flag_environment(audit_event.resource_key, environment.key),
         :ok <- ensure_not_archived(audit_event.resource_key, flag) do
      before_state = audit_state(flag_environment)

      Multi.new()
      |> Multi.update(
        :flag_environment,
        FlagEnvironment.changeset(flag_environment, inverse_status)
      )
      |> Multi.run(:runtime_snapshot, fn repo, _changes ->
        insert_runtime_snapshot(repo, environment, now())
      end)
      |> Multi.insert(
        :audit_event,
        audit_event_changeset(%AuditEvent{}, command, "audit.rollback", :ok, %{
          environment_key: audit_event.environment_key,
          resource_key: audit_event.resource_key,
          before: before_state,
          after: %{
            "status" => inverse_status.status,
            "kill_switch_variant_key" => inverse_status.kill_switch_variant_key
          },
          rollback_of_event_id: audit_event.id,
          links: %{"inverse_event_type" => inverse_operation}
        })
      )
      |> Repo.transact()
      |> case do
        {:ok, %{audit_event: rollback_event}} ->
          {:ok, %{audit_event: AuditEvent.serialize(rollback_event)}}

        {:error, :flag_environment, %Changeset{} = changeset, _changes} ->
          {:error,
           store_changeset_error(changeset, audit_event.resource_key, audit_event.environment_key)}

        {:error, _operation, reason, _changes} ->
          {:error, StoreError.unavailable(cause: reason)}
      end
    end
  end

  defp rollback_audit_event(_command, _audit_event),
    do: {:error, StoreError.invalid_command("audit event cannot be rolled back")}

  defp audit_multi(multi, key, command, ruleset, environment, previous_ruleset) do
    Multi.insert(
      multi,
      key,
      AuditEvent.changeset(%AuditEvent{}, %{
        event_type: audit_event_type(command),
        resource_type: "flag",
        resource_key: audit_flag_key(command),
        environment_key: environment && environment.key,
        actor_id: actor_value(command.actor, "id"),
        actor_type: actor_value(command.actor, "type"),
        actor_display: actor_value(command.actor, "display"),
        reason: Map.get(command, :reason),
        result: :ok,
        metadata: audit_metadata(command, ruleset, previous_ruleset),
        correlation_id: correlation_id(command),
        occurred_at: now()
      })
    )
  end

  defp audit_event_type(%Command.SaveDraftRuleset{}), do: "ruleset.save_draft"
  defp audit_event_type(%Command.PublishRuleset{}), do: "ruleset.publish"
  defp audit_event_type(%Command.ArchiveFlag{}), do: "flag.archive"

  defp audit_flag_key(command), do: to_string(Map.get(command, :flag_key))

  defp audit_metadata(command, ruleset, previous_ruleset) do
    metadata =
      command
      |> Map.get(:metadata, %{})
      |> Map.new()
      |> Map.take([
        :source,
        "source",
        :request_id,
        "request_id",
        :change_request_id,
        "change_request_id",
        :governance_action,
        "governance_action",
        :execution_stage,
        "execution_stage"
      ])

    metadata =
      if ruleset do
        metadata
        |> Map.put(:version, ruleset.version)
        |> Map.merge(ruleset_audit_metadata(previous_ruleset, ruleset))
      else
        metadata
      end

    metadata
  end

  defp correlation_id(command) do
    command.metadata[:request_id] || command.metadata["request_id"]
  end

  defp audit_state(flag_environment) do
    %{
      "status" => flag_environment.status,
      "kill_switch_variant_key" => flag_environment.kill_switch_variant_key
    }
  end

  defp audit_result(command) do
    command.metadata[:audit_result] || command.metadata["audit_result"]
  end

  defp insert_audit_only_event(command, event_type, result) do
    %AuditEvent{}
    |> audit_event_changeset(command, event_type, result, %{})
    |> Repo.insert()
  end

  defp audit_event_changeset(audit_event, command, event_type, result, opts) do
    metadata =
      AuditEvent.metadata(%{
        before: Map.get(opts, :before, %{}),
        after: Map.get(opts, :after, %{}),
        diff: diff_map(Map.get(opts, :before, %{}), Map.get(opts, :after, %{})),
        links: Map.get(opts, :links, %{}),
        tenant: Command.GovernanceSupport.tenant_provenance(command),
        context: Map.get(command, :metadata, %{}),
        request_id: correlation_id(command),
        source: command.metadata[:source] || command.metadata["source"],
        rollback_of_event_id: Map.get(opts, :rollback_of_event_id)
      })
      |> Map.merge(Map.new(Map.get(opts, :metadata, %{})))

    AuditEvent.changeset(audit_event, %{
      event_type: event_type,
      resource_type: "flag",
      resource_key: to_string(Map.get(opts, :resource_key, Map.get(command, :flag_key))),
      environment_key:
        to_string(Map.get(opts, :environment_key, Map.get(command, :environment_key))),
      actor_id: actor_value(command.actor, "id"),
      actor_type: to_string(actor_value(command.actor, "type") || "operator"),
      actor_display: actor_value(command.actor, "display"),
      reason: Map.get(command, :reason),
      result: result,
      metadata: metadata,
      correlation_id: correlation_id(command),
      occurred_at: now()
    })
  end

  defp insert_change_request(repo, command, correlation_id, submitted_at) do
    attrs = %{
      status: "submitted",
      governed_action: Atom.to_string(command.action),
      environment_key: command.environment_key,
      resource_type: command.resource_type,
      resource_key: command.resource_key,
      submitter_id: actor_value(command.actor, "id"),
      submitter_type: actor_value(command.actor, "type") || "operator",
      submitter_display: actor_value(command.actor, "display"),
      reason: command.reason,
      approval_requirement_snapshot: command.approval_requirement,
      command_snapshot: Command.GovernanceSupport.with_tenant_provenance(command.command),
      metadata: command.metadata,
      correlation_id: correlation_id,
      submitted_at: submitted_at,
      resolved_at: nil,
      executed_at: nil,
      inserted_at: submitted_at,
      updated_at: submitted_at
    }

    case repo.insert_all("change_requests", [attrs],
           returning: [
             :id,
             :status,
             :governed_action,
             :environment_key,
             :resource_type,
             :resource_key,
             :submitter_id,
             :submitter_type,
             :submitter_display,
             :reason,
             :approval_requirement_snapshot,
             :command_snapshot,
             :metadata,
             :correlation_id,
             :submitted_at,
             :resolved_at,
             :executed_at,
             :inserted_at,
             :updated_at
           ]
         ) do
      {1, [row]} -> {:ok, normalize_governance_row(row)}
      _ -> {:error, StoreError.unavailable()}
    end
  end

  defp insert_approval(repo, change_request, command, decision, reviewed_at) do
    attrs = %{
      change_request_id: uuid_param(change_request.id),
      decision: decision,
      reviewer_id: actor_value(command.actor, "id"),
      reviewer_type: actor_value(command.actor, "type") || "operator",
      reviewer_display: actor_value(command.actor, "display"),
      reason: command.reason,
      metadata: command.metadata,
      correlation_id: change_request.correlation_id,
      reviewed_at: reviewed_at,
      inserted_at: reviewed_at
    }

    case repo.insert_all("approvals", [attrs],
           returning: [
             :id,
             :change_request_id,
             :decision,
             :reviewer_id,
             :reviewer_type,
             :reviewer_display,
             :reason,
             :metadata,
             :correlation_id,
             :reviewed_at,
             :inserted_at
           ]
         ) do
      {1, [row]} -> {:ok, normalize_governance_row(row)}
      _ -> {:error, StoreError.unavailable()}
    end
  rescue
    error in [ConstraintError] ->
      {:error,
       StoreError.invalid_command("reviewer has already recorded a decision", cause: error)}
  end

  defp update_change_request(repo, change_request, attrs) do
    updates =
      attrs
      |> Enum.reject(fn {_key, value} -> is_nil(value) end)
      |> Map.new()

    query = from(cr in "change_requests", where: field(cr, :id) == ^uuid_param(change_request.id))

    case repo.update_all(query, set: Enum.to_list(updates)) do
      {1, _rows} -> fetch_change_request_row(change_request.id)
      _ -> {:error, StoreError.invalid_command("change request was not found")}
    end
  end

  defp fetch_change_request_row(change_request_id) do
    case from(cr in "change_requests",
           where: field(cr, :id) == ^uuid_param(change_request_id),
           select:
             map(cr, [
               :id,
               :status,
               :governed_action,
               :environment_key,
               :resource_type,
               :resource_key,
               :submitter_id,
               :submitter_type,
               :submitter_display,
               :reason,
               :approval_requirement_snapshot,
               :command_snapshot,
               :metadata,
               :correlation_id,
               :submitted_at,
               :resolved_at,
               :executed_at,
               :inserted_at,
               :updated_at
             ])
         )
         |> Repo.one() do
      nil -> {:error, StoreError.invalid_command("change request was not found")}
      row -> {:ok, normalize_governance_row(row)}
    end
  end

  defp list_approval_rows(change_request_id) do
    from(a in "approvals",
      where: field(a, :change_request_id) == ^uuid_param(change_request_id),
      order_by: [asc: field(a, :reviewed_at)],
      select:
        map(a, [
          :id,
          :change_request_id,
          :decision,
          :reviewer_id,
          :reviewer_type,
          :reviewer_display,
          :reason,
          :metadata,
          :correlation_id,
          :reviewed_at,
          :inserted_at
        ])
    )
    |> Repo.all()
    |> Enum.map(&normalize_governance_row/1)
  end

  defp list_change_request_audit_events(change_request) do
    AuditEvent
    |> where([event], event.correlation_id == ^change_request.correlation_id)
    |> order_by([event], asc: event.occurred_at, asc: event.inserted_at)
    |> Repo.all()
    |> Enum.map(&AuditEvent.serialize/1)
  end

  defp approved_count(repo, change_request_id) do
    from(a in "approvals",
      where:
        field(a, :change_request_id) == ^uuid_param(change_request_id) and
          field(a, :decision) == "approved",
      select: count("*")
    )
    |> repo.one()
  end

  defp ensure_governance_transition(change_request, allowed_statuses) do
    if change_request.status in allowed_statuses do
      :ok
    else
      {:error,
       StoreError.invalid_command("change request is not in a valid state for this operation")}
    end
  end

  defp ensure_unique_reviewer(change_request_id, command) do
    reviewer_id = actor_value(command.actor, "id")

    exists? =
      from(a in "approvals",
        where:
          field(a, :change_request_id) == ^uuid_param(change_request_id) and
            field(a, :reviewer_id) == ^reviewer_id,
        select: count("*")
      )
      |> Repo.one()

    if exists? > 0 do
      {:error, StoreError.invalid_command("reviewer has already recorded a decision")}
    else
      :ok
    end
  end

  defp required_approvals(snapshot) do
    snapshot["required_approvals"] || snapshot[:required_approvals] || 0
  end

  defp serialize_change_request_row(change_request) do
    %{
      id: change_request.id,
      state: governance_state(change_request.status),
      action: governance_action(change_request.governed_action),
      environment_key: change_request.environment_key,
      resource_type: change_request.resource_type,
      resource_key: change_request.resource_key,
      submitted_by: %{
        id: change_request.submitter_id,
        type: change_request.submitter_type,
        display: change_request.submitter_display
      },
      command: change_request.command_snapshot,
      approval_requirement:
        normalize_approval_requirement_snapshot(change_request.approval_requirement_snapshot),
      correlation_id: change_request.correlation_id,
      metadata: change_request.metadata
    }
    |> ChangeRequest.new()
    |> ChangeRequest.serialize()
    |> Map.put(:id, change_request.id)
  end

  defp serialize_approval_row(approval) do
    %{
      change_request_id: approval.change_request_id,
      decision: governance_decision(approval.decision),
      reviewed_by: %{
        id: approval.reviewer_id,
        type: approval.reviewer_type,
        display: approval.reviewer_display
      },
      reason: approval.reason,
      correlation_id: approval.correlation_id
    }
    |> Approval.new()
    |> Approval.serialize()
    |> Map.put(:id, approval.id)
  end

  defp governance_state("submitted"), do: :submitted
  defp governance_state("approved"), do: :approved
  defp governance_state("rejected"), do: :rejected
  defp governance_state("cancelled"), do: :cancelled
  defp governance_state("executed"), do: :executed

  defp governance_decision("approved"), do: :approved
  defp governance_decision(_decision), do: :rejected

  defp governance_action(action) when is_binary(action) do
    action
    |> String.trim()
    |> String.to_existing_atom()
  rescue
    ArgumentError -> :manage_settings
  end

  defp governance_correlation_id(command) do
    command.metadata[:request_id] || command.metadata["request_id"] || Ecto.UUID.generate()
  end

  defp prepare_audience_mutation_change_request(
         %Command.SubmitChangeRequest{
           action: :apply_audience_mutation
         } = command
       ) do
    apply_command =
      Command.ApplyAudienceMutation.new(
        command.command,
        actor: command.actor,
        reason: command.reason,
        metadata: command.metadata
      )

    with {:ok, environment} <- fetch_environment(apply_command.environment_key),
         {:ok, audience} <- fetch_audience_for_mutation(apply_command.audience_key),
         :ok <- ensure_audience_active(audience),
         {:ok, current_preview} <-
           audience_preview_payload(Repo, environment.key, audience, apply_command) do
      with :ok <- AudienceMutationChangeRequest.validate_submit(command, current_preview),
           {:ok, assessment} <- audience_mutation_submit_assessment(command, current_preview) do
        metadata =
          command.metadata
          |> Map.new()
          |> Map.merge(
            AudienceMutationChangeRequest.build_submission_metadata(assessment, current_preview)
          )

        {:ok,
         %{
           command
           | metadata: metadata,
             resource_type: "audience",
             resource_key: apply_command.audience_key
         }}
      end
    end
  end

  defp prepare_audience_mutation_change_request(command), do: {:ok, command}

  defp audience_mutation_submit_assessment(command, current_preview) do
    mutation_command = command.command || %{}
    references = Map.get(current_preview, :affected_references) || []

    BlastRadiusThreshold.assess(%{
      environment_key: command.environment_key,
      operation: Map.get(mutation_command, "operation") || Map.get(mutation_command, :operation),
      preview_fingerprint:
        Map.get(mutation_command, "preview_fingerprint") ||
          Map.get(mutation_command, :preview_fingerprint),
      preview_schema_version:
        Map.get(mutation_command, "preview_schema_version") ||
          Map.get(mutation_command, :preview_schema_version),
      affected_references: references,
      affected_reference_keys:
        Map.get(mutation_command, "affected_reference_keys") ||
          Map.get(mutation_command, :affected_reference_keys),
      tenant_key:
        Map.get(mutation_command, "tenant_key") || Map.get(mutation_command, :tenant_key)
    })
  end

  defp audience_mutation_terminal_metadata(
         %{governed_action: "apply_audience_mutation"} = change_request,
         reason
       ) do
    metadata = change_request.metadata || %{}

    %{
      "blast_radius_assessment" => Map.get(metadata, "blast_radius_assessment"),
      "affected_reference_summary" => Map.get(metadata, "affected_reference_summary"),
      "preview_evidence_summary" => Map.get(metadata, "preview_evidence_summary"),
      "terminal_reason" => reason
    }
  end

  defp audience_mutation_terminal_metadata(_change_request, _reason), do: %{}

  defp governance_audit_command(command, change_request, stage) do
    metadata =
      command.metadata
      |> Map.merge(%{
        "request_id" => change_request.correlation_id,
        "change_request_id" => change_request.id,
        "governance_action" => change_request.governed_action,
        "execution_stage" => stage,
        "resource_key" => change_request.resource_key,
        "tenant" => change_request.command_snapshot["tenant"]
      })

    Map.merge(command, %{metadata: metadata, actor: command.actor, reason: command.reason})
  end

  defp execute_governed_change(%{governed_action: governed_action} = change_request, command)
       when governed_action in [
              "publish_ruleset",
              "advance_rollout",
              "engage_kill_switch",
              "release_kill_switch",
              "promote_environment",
              "apply_audience_mutation"
            ] do
    execute_bounded_governed_action(governed_action, change_request, command)
  end

  defp execute_governed_change(_change_request, _command) do
    {:error, StoreError.invalid_command("governed action is not implemented")}
  end

  defp insert_scheduled_execution(attrs, command) do
    case fetch_idempotent_scheduled_execution(attrs.idempotency_key) do
      {:ok, existing} ->
        {:ok,
         %{
           scheduled_execution: serialize_scheduled_execution_row(existing),
           attempts:
             list_execution_attempt_rows(existing.id)
             |> Enum.map(&serialize_execution_attempt_row/1)
         }}

      :not_found ->
        do_insert_scheduled_execution(attrs, command)
    end
  end

  defp do_insert_scheduled_execution(attrs, command) do
    Multi.new()
    |> Multi.run(:scheduled_execution, fn repo, _changes ->
      case repo.insert_all("scheduled_executions", [attrs],
             returning: scheduled_execution_fields()
           ) do
        {1, [row]} -> {:ok, normalize_scheduled_execution_row(row)}
        _ -> {:error, StoreError.unavailable()}
      end
    end)
    |> Multi.run(:oban_job, fn repo, %{scheduled_execution: scheduled_execution} ->
      enqueue_scheduled_execution_job(
        repo,
        scheduled_execution,
        command.actor,
        scheduled_execution.scheduled_for
      )
    end)
    |> Multi.run(:persist_job_id, fn repo,
                                     %{
                                       scheduled_execution: scheduled_execution,
                                       oban_job: oban_job
                                     } ->
      repo.update_all(
        from(se in "scheduled_executions",
          where: field(se, :id) == ^uuid_param(scheduled_execution.id)
        ),
        set: [last_oban_job_id: oban_job.id, updated_at: now()]
      )

      fetch_scheduled_execution_row(scheduled_execution.id)
    end)
    |> Multi.run(:audit_event, fn repo, %{persist_job_id: scheduled_execution} ->
      insert_scheduled_execution_audit_event(
        repo,
        scheduled_execution,
        command,
        "scheduled_execution.scheduled",
        :ok
      )
    end)
    |> Repo.transact()
    |> case do
      {:ok, %{persist_job_id: scheduled_execution, audit_event: audit_event}} ->
        emit_scheduled_execution_telemetry(:scheduled, command, scheduled_execution, audit_event)

        {:ok,
         %{
           scheduled_execution: serialize_scheduled_execution_row(scheduled_execution),
           attempts: []
         }}

      {:error, _operation, reason, _changes} ->
        {:error, normalize_governance_failure(reason)}
    end
  end

  defp enqueue_scheduled_execution_job(repo, scheduled_execution, actor, scheduled_at) do
    existing_job_id =
      from(job in "oban_jobs",
        where:
          fragment("?->>'scheduled_execution_id' = ?", field(job, :args), ^scheduled_execution.id) and
            field(job, :state) in ["scheduled", "available", "executing", "retryable"],
        select: field(job, :id),
        limit: 1
      )
      |> repo.one()

    case existing_job_id do
      nil ->
        job =
          scheduled_execution
          |> ScheduledExecution.new()
          |> Map.put(:scheduled_for, scheduled_at)
          |> Oban.scheduled_execution_job(
            Context.new(
              actor: Map.new(actor || %{}),
              environment: scheduled_execution.environment_key,
              request_id: scheduled_execution.correlation_id,
              attributes: %{"source" => "scheduled_execution_worker"}
            )
          )

        case repo.insert_all("oban_jobs", [job],
               returning: [:id, :worker, :args, :scheduled_at, :state]
             ) do
          {1, [row]} -> {:ok, Map.new(row)}
          _ -> {:error, StoreError.unavailable()}
        end

      job_id ->
        {:ok, %{id: job_id}}
    end
  end

  defp run_scheduled_execution(command, scheduled_execution) do
    attempt_number = scheduled_execution.attempt_count + 1
    started_at = now()

    with {:ok, attempt} <-
           insert_execution_attempt_row(
             scheduled_execution.id,
             attempt_number,
             "running",
             started_at,
             nil,
             command.metadata
           ) do
      case perform_scheduled_execution(scheduled_execution, command) do
        {:ok, execution_result} ->
          finalize_scheduled_execution_success(
            scheduled_execution,
            attempt,
            command,
            execution_result
          )

        {:error, reason} ->
          finalize_scheduled_execution_failure(scheduled_execution, attempt, command, reason)
      end
    end
  end

  defp perform_scheduled_execution(
         %{change_request_id: change_request_id} = _scheduled_execution,
         command
       )
       when is_binary(change_request_id) do
    with {:ok, change_request} <- fetch_change_request_row(change_request_id),
         {:ok, execution_result, _updated_change_request, _audit_event} <-
           execute_governed_change(change_request, command) do
      {:ok, execution_result}
    end
  end

  defp perform_scheduled_execution(
         %{governed_action: governed_action} = scheduled_execution,
         command
       )
       when governed_action in [
              "publish_ruleset",
              "advance_rollout",
              "engage_kill_switch",
              "release_kill_switch",
              "promote_environment"
            ] do
    execute_direct_scheduled_action(governed_action, scheduled_execution, command)
  end

  defp perform_scheduled_execution(_scheduled_execution, _command) do
    {:error, StoreError.invalid_command("governed action is not implemented")}
  end

  defp execute_bounded_governed_action("publish_ruleset", change_request, command) do
    with {:ok, _environment, _flag, flag_environment} <-
           fetch_schedulable_flag_context(
             change_request.resource_key,
             change_request.environment_key
           ),
         {:ok, _ruleset} <-
           ensure_publishable_ruleset(
             flag_environment,
             change_request.environment_key,
             change_request.command_snapshot["version"]
           ),
         {:ok, execution_result} <-
           publish_ruleset(
             Command.PublishRuleset.new(
               change_request.resource_key,
               change_request.environment_key,
               version: change_request.command_snapshot["version"],
               actor: command.actor,
               reason: command.reason,
               metadata: %{
                 request_id: change_request.correlation_id,
                 source: change_request.metadata["source"],
                 change_request_id: change_request.id,
                 governance_action: change_request.governed_action,
                 execution_stage: "execute"
               }
             )
           ) do
      persisted_change_request =
        case fetch_change_request_row(change_request.id) do
          {:ok, current_change_request} -> current_change_request
          {:error, _error} -> change_request
        end

      {:ok, execution_result, persisted_change_request, nil}
    end
  end

  defp execute_bounded_governed_action("engage_kill_switch", change_request, command) do
    with {:ok, _environment, _flag, flag_environment} <-
           fetch_schedulable_flag_context(
             change_request.resource_key,
             change_request.environment_key
           ),
         :ok <- ensure_kill_switch_transition(flag_environment, :engage),
         {:ok, execution_result} <-
           engage_kill_switch(
             Command.EngageKillSwitch.new(
               change_request.resource_key,
               change_request.environment_key,
               actor: command.actor,
               reason: command.reason,
               metadata: %{
                 request_id: change_request.correlation_id,
                 source: change_request.metadata["source"],
                 change_request_id: change_request.id,
                 governance_action: change_request.governed_action,
                 execution_stage: "execute"
               }
             )
           ) do
      {:ok, execution_result, change_request, nil}
    end
  end

  defp execute_bounded_governed_action("release_kill_switch", change_request, command) do
    with {:ok, _environment, _flag, flag_environment} <-
           fetch_schedulable_flag_context(
             change_request.resource_key,
             change_request.environment_key
           ),
         :ok <- ensure_kill_switch_transition(flag_environment, :release),
         {:ok, execution_result} <-
           release_kill_switch(
             Command.ReleaseKillSwitch.new(
               change_request.resource_key,
               change_request.environment_key,
               actor: command.actor,
               reason: command.reason,
               metadata: %{
                 request_id: change_request.correlation_id,
                 source: change_request.metadata["source"],
                 change_request_id: change_request.id,
                 governance_action: change_request.governed_action,
                 execution_stage: "execute"
               }
             )
           ) do
      {:ok, execution_result, change_request, nil}
    end
  end

  defp execute_bounded_governed_action("advance_rollout", change_request, command) do
    advance_rollout(
      Command.AdvanceRollout.new(
        change_request.resource_key,
        change_request.environment_key,
        Map.merge(
          change_request.command_snapshot["rollout"] || change_request.command_snapshot,
          %{"signal_facts" => change_request.command_snapshot["signal_facts"]}
        ),
        actor: command.actor,
        reason: command.reason,
        metadata: %{
          request_id: change_request.correlation_id,
          source: change_request.metadata["source"],
          change_request_id: change_request.id,
          governance_action: change_request.governed_action,
          execution_stage: "execute",
          tenant: change_request.command_snapshot["tenant"],
          signal_facts: change_request.command_snapshot["signal_facts"]
        }
      )
    )
  end

  defp execute_bounded_governed_action("apply_audience_mutation", change_request, command) do
    apply_command =
      change_request.command_snapshot
      |> Command.ApplyAudienceMutation.new(
        actor: command.actor,
        reason: command.reason,
        metadata:
          Map.merge(command.metadata || %{}, %{
            "change_request_id" => change_request.id,
            "execution_stage" => "execute",
            "request_id" => change_request.correlation_id
          })
      )

    with {:ok, execution_result} <-
           apply_confirmed_audience_mutation(apply_command,
             governed_apply?: true,
             audit_blocked?: false
           ) do
      persisted_change_request =
        case fetch_change_request_row(change_request.id) do
          {:ok, current_change_request} -> current_change_request
          {:error, _error} -> change_request
        end

      {:ok, execution_result, persisted_change_request, nil}
    end
  end

  defp execute_bounded_governed_action("promote_environment", change_request, command) do
    promotion_command =
      change_request.command_snapshot
      |> Command.ApplyPromotion.new(
        actor: command.actor,
        reason: command.reason,
        metadata:
          promotion_execution_metadata(
            command.metadata,
            change_request.correlation_id,
            change_request.metadata["source"],
            change_request_id: change_request.id,
            execution_stage: "execute"
          )
      )

    with :ok <- Apply.validate_governed_snapshot(promotion_command),
         {:ok, execution_result} <-
           run_promotion_apply(promotion_command, allow_protected_target?: true) do
      persisted_change_request =
        case fetch_change_request_row(change_request.id) do
          {:ok, current_change_request} -> current_change_request
          {:error, _error} -> change_request
        end

      {:ok, execution_result, persisted_change_request, nil}
    end
  end

  defp execute_direct_scheduled_action("publish_ruleset", scheduled_execution, command) do
    with {:ok, _environment, _flag, flag_environment} <-
           fetch_schedulable_flag_context(
             scheduled_execution.resource_key,
             scheduled_execution.environment_key
           ),
         {:ok, _ruleset} <-
           ensure_publishable_ruleset(
             flag_environment,
             scheduled_execution.environment_key,
             scheduled_execution.command_snapshot["version"]
           ) do
      publish_ruleset(
        Command.PublishRuleset.new(
          scheduled_execution.resource_key,
          scheduled_execution.environment_key,
          version: scheduled_execution.command_snapshot["version"],
          actor: command.actor,
          reason: command.reason,
          metadata: %{
            request_id: scheduled_execution.correlation_id,
            source: scheduled_execution.metadata["source"],
            scheduled_execution_id: scheduled_execution.id,
            execution_stage: "scheduled_execution"
          }
        )
      )
    end
  end

  defp execute_direct_scheduled_action("engage_kill_switch", scheduled_execution, command) do
    with {:ok, _environment, _flag, flag_environment} <-
           fetch_schedulable_flag_context(
             scheduled_execution.resource_key,
             scheduled_execution.environment_key
           ),
         :ok <- ensure_kill_switch_transition(flag_environment, :engage) do
      engage_kill_switch(
        Command.EngageKillSwitch.new(
          scheduled_execution.resource_key,
          scheduled_execution.environment_key,
          actor: command.actor,
          reason: command.reason,
          metadata: %{
            request_id: scheduled_execution.correlation_id,
            source: scheduled_execution.metadata["source"],
            scheduled_execution_id: scheduled_execution.id,
            execution_stage: "scheduled_execution"
          }
        )
      )
    end
  end

  defp execute_direct_scheduled_action("release_kill_switch", scheduled_execution, command) do
    with {:ok, _environment, _flag, flag_environment} <-
           fetch_schedulable_flag_context(
             scheduled_execution.resource_key,
             scheduled_execution.environment_key
           ),
         :ok <- ensure_kill_switch_transition(flag_environment, :release) do
      release_kill_switch(
        Command.ReleaseKillSwitch.new(
          scheduled_execution.resource_key,
          scheduled_execution.environment_key,
          actor: command.actor,
          reason: command.reason,
          metadata: %{
            request_id: scheduled_execution.correlation_id,
            source: scheduled_execution.metadata["source"],
            scheduled_execution_id: scheduled_execution.id,
            execution_stage: "scheduled_execution"
          }
        )
      )
    end
  end

  defp execute_direct_scheduled_action("advance_rollout", scheduled_execution, command) do
    if RolloutAutoAdvance.automation_tick?(scheduled_execution.metadata) do
      case RolloutAutoAdvance.execute_scheduled_tick(__MODULE__, scheduled_execution, command) do
        {:ok, %{outcome: :blocked} = blocked_result} ->
          {:ok, blocked_result}

        {:ok, %{outcome: :change_request_submitted} = cr_result} ->
          {:ok, cr_result}

        {:ok, %Command.AdvanceRollout{} = advance_command} ->
          advance_rollout(advance_command)

        {:error, reason} ->
          {:error, reason}
      end
    else
      advance_rollout(
        Command.AdvanceRollout.new(
          scheduled_execution.resource_key,
          scheduled_execution.environment_key,
          Map.merge(
            scheduled_execution.command_snapshot["rollout"] ||
              scheduled_execution.command_snapshot,
            %{"signal_facts" => scheduled_execution.command_snapshot["signal_facts"]}
          ),
          actor: command.actor,
          reason: command.reason,
          metadata: %{
            request_id: scheduled_execution.correlation_id,
            source: scheduled_execution.metadata["source"],
            scheduled_execution_id: scheduled_execution.id,
            execution_stage: "scheduled_execution",
            tenant: scheduled_execution.command_snapshot["tenant"],
            signal_facts: scheduled_execution.command_snapshot["signal_facts"]
          }
        )
      )
    end
  end

  defp execute_direct_scheduled_action("promote_environment", scheduled_execution, command) do
    promotion_command =
      scheduled_execution.command_snapshot
      |> Command.ApplyPromotion.new(
        actor: command.actor,
        reason: command.reason,
        metadata:
          promotion_execution_metadata(
            command.metadata,
            scheduled_execution.correlation_id,
            scheduled_execution.metadata["source"],
            scheduled_execution_id: scheduled_execution.id,
            execution_stage: "scheduled_execution"
          )
      )

    with :ok <- Apply.validate_governed(promotion_command),
         {:ok, execution_result} <-
           run_promotion_apply(promotion_command, allow_protected_target?: true) do
      {:ok, execution_result}
    end
  end

  defp promotion_execution_metadata(metadata, request_id, source, links) do
    metadata
    |> Map.merge(%{
      request_id: request_id,
      source: source,
      governance_action: "promote_environment"
    })
    |> Map.merge(Map.new(links))
  end

  defp fetch_schedulable_flag_context(resource_key, environment_key) do
    with {:ok, environment} <- fetch_environment(environment_key),
         {:ok, flag, flag_environment} <- fetch_flag_environment(resource_key, environment.key),
         :ok <- ensure_schedulable_flag_not_archived(flag) do
      {:ok, environment, flag, flag_environment}
    end
  end

  defp ensure_schedulable_flag_not_archived(%{archived_at: nil}), do: :ok

  defp ensure_schedulable_flag_not_archived(_flag),
    do: {:error, StoreError.invalid_command("archived_resource")}

  defp ensure_publishable_ruleset(flag_environment, environment_key, version) do
    case resolve_publishable_ruleset(flag_environment, environment_key, version) do
      {:ok, ruleset} -> {:ok, ruleset}
      {:error, _error} -> {:error, StoreError.invalid_command("ruleset_not_publishable")}
    end
  end

  defp ensure_kill_switch_transition(flag_environment, :engage) do
    if flag_environment.status == :killswitched or
         not is_nil(flag_environment.kill_switch_variant_key) do
      {:error, StoreError.invalid_command("kill_switch_already_engaged")}
    else
      :ok
    end
  end

  defp ensure_kill_switch_transition(flag_environment, :release) do
    if flag_environment.status == :killswitched or
         not is_nil(flag_environment.kill_switch_variant_key) do
      :ok
    else
      {:error, StoreError.invalid_command("kill_switch_already_released")}
    end
  end

  defp ensure_active_ruleset(%FlagEnvironment{} = flag_environment, _command) do
    case active_ruleset(flag_environment) do
      %Ruleset{} = ruleset -> {:ok, ruleset}
      nil -> {:error, StoreError.invalid_command("rollout_stage_conflict")}
    end
  end

  defp resolve_rollout_rule(%Ruleset{} = ruleset, nil) do
    rollout_rules = Enum.filter(ruleset.rules, &match?(%{rollout: %_{}}, &1))

    case rollout_rules do
      [rule] -> {:ok, rule}
      _other -> {:error, StoreError.invalid_command("rollout_stage_conflict")}
    end
  end

  defp resolve_rollout_rule(%Ruleset{} = ruleset, rule_key) do
    case Enum.find(ruleset.rules, &(&1.key == rule_key and not is_nil(&1.rollout))) do
      nil -> {:error, StoreError.invalid_command("rollout_stage_conflict")}
      rule -> {:ok, rule}
    end
  end

  defp ensure_rollout_percentage(nil),
    do: {:error, StoreError.invalid_command("rollout_stage_conflict")}

  defp ensure_rollout_percentage(percentage)
       when is_integer(percentage) and percentage >= 0 and percentage <= 100,
       do: {:ok, percentage}

  defp ensure_rollout_percentage(_percentage),
    do: {:error, StoreError.invalid_command("rollout_stage_conflict")}

  defp advanced_ruleset_attrs(%Ruleset{} = ruleset, rule_key, percentage) do
    rules =
      Enum.map(ruleset.rules, fn rule ->
        if rule.key == rule_key and rule.rollout do
          Map.put(rule, :rollout, %{rule.rollout | percentage: percentage})
        else
          rule
        end
      end)
      |> Enum.map(&serialize_rule/1)

    {:ok, %{salt: ruleset.salt, metadata: ruleset.metadata, rules: rules}}
  end

  defp advance_decision_attrs(command, ruleset, rollout_rule, occurred_at) do
    %{
      flag_key: command.flag_key,
      environment_key: command.environment_key,
      rule_key: rollout_rule.key,
      stage: command.stage,
      tenant_key:
        command.metadata["tenant_key"] || get_in(command.metadata, ["tenant", "tenant_key"]),
      decision_state: :pending_data,
      action_type: :advance,
      decision_reason: "monitoring_window_active",
      effective_percentage: command.percentage,
      rollout_salt: rollout_rule.rollout.salt,
      variant_fingerprint: variant_fingerprint(rollout_rule),
      monitoring_window_started_at: command.monitoring_window_started_at || occurred_at,
      monitoring_window_ends_at: command.monitoring_window_ends_at,
      occurred_at: occurred_at,
      signal_facts: Enum.map(command.signal_facts, &SignalFact.metadata/1),
      guardrail_evidence: first_guardrail_evidence(command.signal_facts),
      authored_snapshot: serialize_ruleset(ruleset),
      correlation_id: correlation_id(command),
      metadata: command.metadata
    }
  end

  defp execute_guardrail_decision(
         command,
         environment,
         flag,
         flag_environment,
         active_ruleset,
         rollout_rule,
         evaluated
       ) do
    stable_target =
      if evaluated.state == :rollback_triggered do
        latest_stable_guardrail_snapshot(
          command.flag_key,
          environment.key,
          rollout_rule.key,
          rollout_rule.rollout.salt,
          variant_fingerprint(rollout_rule),
          correlation_id(command)
        )
      end

    case evaluated.state do
      :rollback_triggered when is_nil(stable_target) ->
        persist_guardrail_decision_without_mutation(
          command,
          environment,
          flag_environment,
          active_ruleset,
          rollout_rule,
          %{evaluated | state: :held, reason: "stable_target_missing"},
          :hold,
          "rollout.guardrail_held"
        )

      :rollback_triggered ->
        rollback_from_stable_snapshot(
          command,
          environment,
          flag,
          flag_environment,
          active_ruleset,
          rollout_rule,
          evaluated,
          stable_target
        )

      :held ->
        persist_guardrail_decision_without_mutation(
          command,
          environment,
          flag_environment,
          active_ruleset,
          rollout_rule,
          evaluated,
          :hold,
          "rollout.guardrail_held"
        )

      _other ->
        persist_guardrail_decision_without_mutation(
          command,
          environment,
          flag_environment,
          active_ruleset,
          rollout_rule,
          evaluated,
          :evaluate,
          "rollout.guardrail_evaluated"
        )
    end
  end

  defp persist_guardrail_decision_without_mutation(
         command,
         environment,
         flag_environment,
         active_ruleset,
         rollout_rule,
         evaluated,
         action_type,
         event_type
       ) do
    occurred_at = evaluated.evaluated_at |> DateTime.truncate(:microsecond)

    authored_snapshot =
      if evaluated.state == :healthy and evaluated.monitoring_window_closed? do
        serialize_ruleset(active_ruleset)
      end

    Multi.new()
    |> Multi.insert(
      :decision,
      GuardrailDecision.changeset(%GuardrailDecision{}, %{
        flag_key: command.flag_key,
        environment_key: environment.key,
        rule_key: rollout_rule.key,
        stage: command.stage,
        tenant_key:
          command.metadata["tenant_key"] || get_in(command.metadata, ["tenant", "tenant_key"]),
        decision_state: evaluated.state,
        action_type: action_type,
        decision_reason: evaluated.reason,
        effective_percentage: rollout_rule.rollout.percentage,
        rollout_salt: rollout_rule.rollout.salt,
        variant_fingerprint: variant_fingerprint(rollout_rule),
        monitoring_window_started_at: command.monitoring_window_started_at,
        monitoring_window_ends_at: command.monitoring_window_ends_at,
        occurred_at: occurred_at,
        signal_facts: Enum.map(evaluated.signal_facts, &SignalFact.metadata/1),
        guardrail_evidence: first_guardrail_evidence(evaluated.signal_facts),
        authored_snapshot: authored_snapshot,
        correlation_id: correlation_id(command),
        metadata: command.metadata
      })
    )
    |> Multi.run(:audit_event, fn repo, %{decision: decision} ->
      repo.insert(
        audit_event_changeset(%AuditEvent{}, command, event_type, :ok, %{
          environment_key: environment.key,
          links: %{"guardrail_decision_id" => decision.id},
          guardrail: first_guardrail_evidence(evaluated.signal_facts)
        })
      )
    end)
    |> Repo.transact()
    |> case do
      {:ok, %{decision: decision}} ->
        {:ok, guardrail_status_payload(decision, active_ruleset_version(flag_environment))}

      {:error, :decision, %Changeset{} = changeset, _changes} ->
        {:error, store_changeset_error(changeset, command.flag_key, command.environment_key)}

      {:error, _operation, reason, _changes} ->
        {:error, StoreError.unavailable(cause: reason)}
    end
  end

  defp rollback_from_stable_snapshot(
         command,
         environment,
         flag,
         flag_environment,
         active_ruleset,
         rollout_rule,
         evaluated,
         stable_target
       ) do
    published_at = now()
    snapshot = stable_target.authored_snapshot || %{}

    Multi.new()
    |> Multi.insert(
      :ruleset,
      Ruleset.changeset(%Ruleset{}, %{
        flag_environment_id: flag_environment.id,
        version: next_ruleset_version(flag_environment.id),
        status: :published,
        salt: snapshot["salt"] || snapshot[:salt] || active_ruleset.salt,
        published_at: published_at,
        metadata: snapshot["metadata"] || snapshot[:metadata] || %{},
        rules: snapshot["rules"] || snapshot[:rules] || []
      })
    )
    |> Multi.run(:flag_environment, fn repo, %{ruleset: ruleset} ->
      flag_environment
      |> FlagEnvironment.changeset(%{
        active_ruleset_id: ruleset.id,
        status: :active,
        last_published_at: published_at
      })
      |> repo.update()
    end)
    |> Multi.update(:flag, Changeset.change(flag, updated_at: published_at))
    |> Multi.run(:runtime_snapshot, fn repo, _changes ->
      insert_runtime_snapshot(repo, environment, published_at)
    end)
    |> Multi.insert(
      :decision,
      GuardrailDecision.changeset(%GuardrailDecision{}, %{
        flag_key: command.flag_key,
        environment_key: environment.key,
        rule_key: rollout_rule.key,
        stage: command.stage,
        tenant_key:
          command.metadata["tenant_key"] || get_in(command.metadata, ["tenant", "tenant_key"]),
        decision_state: :rollback_triggered,
        action_type: :rollback,
        decision_reason: evaluated.reason,
        effective_percentage: rollout_rule.rollout.percentage,
        rollout_salt: rollout_rule.rollout.salt,
        variant_fingerprint: variant_fingerprint(rollout_rule),
        monitoring_window_started_at: command.monitoring_window_started_at,
        monitoring_window_ends_at: command.monitoring_window_ends_at,
        occurred_at: published_at,
        signal_facts: Enum.map(evaluated.signal_facts, &SignalFact.metadata/1),
        guardrail_evidence: first_guardrail_evidence(evaluated.signal_facts),
        authored_snapshot: serialize_ruleset(active_ruleset),
        rollback_target_snapshot: stable_target.authored_snapshot,
        correlation_id: correlation_id(command),
        metadata: command.metadata
      })
    )
    |> Multi.run(:audit_event, fn repo, %{decision: decision, ruleset: ruleset} ->
      repo.insert(
        audit_event_changeset(%AuditEvent{}, command, "rollout.guardrail_rollback", :ok, %{
          environment_key: environment.key,
          before: ruleset_audit_state(active_ruleset),
          after: ruleset_audit_state(ruleset),
          diff: ruleset_position_diff(active_ruleset, ruleset),
          links: %{
            "guardrail_decision_id" => decision.id,
            "stable_guardrail_decision_id" => stable_target.id
          },
          guardrail: first_guardrail_evidence(evaluated.signal_facts)
        })
      )
    end)
    |> Repo.transact()
    |> case do
      {:ok, %{decision: decision, ruleset: ruleset}} ->
        {:ok, guardrail_status_payload(decision, ruleset.version)}

      {:error, :ruleset, %Changeset{} = changeset, _changes} ->
        {:error, ruleset_error(changeset, command.flag_key, command.environment_key)}

      {:error, :flag_environment, %Changeset{} = changeset, _changes} ->
        {:error, store_changeset_error(changeset, command.flag_key, command.environment_key)}

      {:error, :decision, %Changeset{} = changeset, _changes} ->
        {:error, store_changeset_error(changeset, command.flag_key, command.environment_key)}

      {:error, _operation, reason, _changes} ->
        {:error, StoreError.unavailable(cause: reason)}
    end
  end

  defp latest_guardrail_decision(%Command.FetchGuardrailStatus{} = command) do
    GuardrailDecision
    |> where(
      [decision],
      decision.flag_key == ^to_string(command.flag_key) and
        decision.environment_key == ^to_string(command.environment_key)
    )
    |> maybe_filter_guardrail_decision_rule(command.rule_key)
    |> maybe_filter_guardrail_decision_stage(command.stage)
    |> order_by([decision], desc: decision.occurred_at, desc: decision.inserted_at)
    |> limit(1)
    |> Repo.one()
  end

  defp latest_stable_guardrail_snapshot(
         flag_key,
         environment_key,
         rule_key,
         rollout_salt,
         variant_fingerprint,
         correlation_id
       ) do
    GuardrailDecision
    |> where(
      [decision],
      decision.flag_key == ^to_string(flag_key) and
        decision.environment_key == ^to_string(environment_key) and
        decision.rule_key == ^to_string(rule_key) and
        decision.decision_state == :healthy and
        decision.rollout_salt == ^rollout_salt and
        decision.variant_fingerprint == ^variant_fingerprint and
        not is_nil(decision.authored_snapshot) and
        decision.correlation_id != ^correlation_id
    )
    |> order_by([decision], desc: decision.occurred_at, desc: decision.inserted_at)
    |> limit(1)
    |> Repo.one()
  end

  defp maybe_filter_guardrail_decision_rule(query, nil), do: query

  defp maybe_filter_guardrail_decision_rule(query, rule_key),
    do: where(query, [decision], decision.rule_key == ^to_string(rule_key))

  defp maybe_filter_guardrail_decision_stage(query, nil), do: query

  defp maybe_filter_guardrail_decision_stage(query, stage),
    do: where(query, [decision], decision.stage == ^to_string(stage))

  defp guardrail_status_payload(%GuardrailDecision{} = decision, active_ruleset_version) do
    %{
      flag_key: decision.flag_key,
      environment_key: decision.environment_key,
      rule_key: decision.rule_key,
      stage: decision.stage,
      active_ruleset_version: active_ruleset_version,
      decision: GuardrailDecision.serialize(decision)
    }
  end

  defp first_guardrail_evidence([]), do: %{}
  defp first_guardrail_evidence([fact | _rest]), do: SignalFact.metadata(fact)

  defp variant_fingerprint(rule) do
    rule.variants
    |> Enum.map(fn variant ->
      %{
        key: variant.key,
        weight: variant.weight,
        value: variant.value
      }
    end)
    |> Jason.encode!()
  end

  defp finalize_scheduled_execution_success(
         scheduled_execution,
         attempt,
         command,
         execution_result
       ) do
    finished_at = now()

    automation_metadata =
      RolloutAutoAdvance.automation_execution_metadata(execution_result || %{})

    Multi.new()
    |> Multi.run(:attempt, fn repo, _changes ->
      update_execution_attempt_row(
        repo,
        attempt.id,
        "completed",
        finished_at,
        nil,
        command.metadata
      )
    end)
    |> Multi.run(:scheduled_execution, fn repo, _changes ->
      repo.update_all(
        from(se in "scheduled_executions",
          where: field(se, :id) == ^uuid_param(scheduled_execution.id)
        ),
        set: [
          state: "completed",
          attempt_count: scheduled_execution.attempt_count + 1,
          executed_at: finished_at,
          failure_reason: nil,
          execution_metadata:
            scheduled_execution.execution_metadata
            |> scheduled_transition_metadata("completed", command)
            |> Map.merge(automation_metadata),
          updated_at: finished_at
        ]
      )

      fetch_scheduled_execution_row(scheduled_execution.id)
    end)
    |> Multi.run(:audit_event, fn repo, %{scheduled_execution: updated} ->
      insert_scheduled_execution_audit_event(
        repo,
        updated,
        command,
        "scheduled_execution.succeeded",
        :ok,
        execution_result
      )
    end)
    |> Repo.transact()
    |> case do
      {:ok, %{scheduled_execution: updated, audit_event: audit_event}} ->
        if lifecycle_telemetry_enabled?(command) do
          emit_scheduled_execution_telemetry(:succeeded, command, updated, audit_event)
        end

        {:ok,
         %{
           scheduled_execution: serialize_scheduled_execution_row(updated),
           execution_result: execution_result,
           attempts:
             list_execution_attempt_rows(updated.id)
             |> Enum.map(&serialize_execution_attempt_row/1)
         }}

      {:error, _operation, reason, _changes} ->
        {:error, normalize_governance_failure(reason)}
    end
  end

  defp finalize_scheduled_execution_failure(
         scheduled_execution,
         attempt,
         command,
         %Rulestead.Error{} = error
       ) do
    finalize_scheduled_execution_failure(scheduled_execution, attempt, command, error.message)
  end

  defp finalize_scheduled_execution_failure(scheduled_execution, attempt, command, reason) do
    finished_at = now()
    failure_reason = normalize_failure_reason(reason)
    next_attempt_count = scheduled_execution.attempt_count + 1

    next_state =
      if next_attempt_count >= @scheduled_execution_retry_limit,
        do: "quarantined",
        else: "scheduled"

    attempt_state = if next_state == "quarantined", do: "quarantined", else: "failed"

    event_type =
      if next_state == "quarantined",
        do: "scheduled_execution.quarantined",
        else: "scheduled_execution.failed"

    telemetry_event = if next_state == "quarantined", do: :quarantined, else: :failed

    Multi.new()
    |> Multi.run(:attempt, fn repo, _changes ->
      update_execution_attempt_row(
        repo,
        attempt.id,
        attempt_state,
        finished_at,
        failure_reason,
        command.metadata
      )
    end)
    |> Multi.run(:scheduled_execution, fn repo, _changes ->
      repo.update_all(
        from(se in "scheduled_executions",
          where: field(se, :id) == ^uuid_param(scheduled_execution.id)
        ),
        set: [
          state: next_state,
          attempt_count: next_attempt_count,
          failure_reason: failure_reason,
          execution_metadata:
            scheduled_transition_metadata(
              scheduled_execution.execution_metadata,
              next_state,
              command
            ),
          updated_at: finished_at
        ]
      )

      fetch_scheduled_execution_row(scheduled_execution.id)
    end)
    |> Multi.run(:audit_event, fn repo, %{scheduled_execution: updated} ->
      insert_scheduled_execution_audit_event(repo, updated, command, event_type, :error)
    end)
    |> Repo.transact()
    |> case do
      {:ok, %{scheduled_execution: updated, audit_event: audit_event}} ->
        if lifecycle_telemetry_enabled?(command) do
          emit_scheduled_execution_telemetry(telemetry_event, command, updated, audit_event)
        end

        {:error, StoreError.invalid_command(failure_reason)}

      {:error, _operation, txn_reason, _changes} ->
        {:error, normalize_governance_failure(txn_reason)}
    end
  end

  defp insert_execution_attempt_row(
         scheduled_execution_id,
         attempt_number,
         state,
         started_at,
         failure_reason,
         metadata
       ) do
    attrs = %{
      scheduled_execution_id: uuid_param(scheduled_execution_id),
      attempt_number: attempt_number,
      state: state,
      started_at: started_at,
      finished_at: nil,
      failure_reason: failure_reason,
      metadata: metadata,
      inserted_at: started_at
    }

    case Repo.insert_all("execution_attempts", [attrs], returning: execution_attempt_fields()) do
      {1, [row]} -> {:ok, normalize_execution_attempt_row(row)}
      _ -> {:error, StoreError.unavailable()}
    end
  end

  defp update_execution_attempt_row(
         repo,
         attempt_id,
         state,
         finished_at,
         failure_reason,
         metadata
       ) do
    repo.update_all(
      from(attempt in "execution_attempts",
        where: field(attempt, :id) == ^uuid_param(attempt_id)
      ),
      set: [
        state: state,
        finished_at: finished_at,
        failure_reason: failure_reason,
        metadata: metadata
      ]
    )

    case from(attempt in "execution_attempts",
           where: field(attempt, :id) == ^uuid_param(attempt_id),
           select: map(attempt, ^execution_attempt_fields())
         )
         |> repo.one() do
      nil -> {:error, StoreError.invalid_command("execution attempt was not found")}
      row -> {:ok, normalize_execution_attempt_row(row)}
    end
  end

  defp fetch_scheduled_execution_row(scheduled_execution_id) do
    case from(se in "scheduled_executions",
           where: field(se, :id) == ^uuid_param(scheduled_execution_id),
           select: map(se, ^scheduled_execution_fields())
         )
         |> Repo.one() do
      nil -> {:error, StoreError.invalid_command("scheduled execution was not found")}
      row -> {:ok, normalize_scheduled_execution_row(row)}
    end
  end

  defp fetch_idempotent_scheduled_execution(nil), do: :not_found

  defp fetch_idempotent_scheduled_execution(idempotency_key) do
    case from(se in "scheduled_executions",
           where: field(se, :idempotency_key) == ^idempotency_key,
           select: map(se, ^scheduled_execution_fields())
         )
         |> Repo.one() do
      nil ->
        :not_found

      row ->
        existing = normalize_scheduled_execution_row(row)

        if existing.state in ["scheduled", "running", "completed"] do
          {:ok, existing}
        else
          :not_found
        end
    end
  end

  defp list_execution_attempt_rows(scheduled_execution_id) do
    from(attempt in "execution_attempts",
      where: field(attempt, :scheduled_execution_id) == ^uuid_param(scheduled_execution_id),
      order_by: [asc: field(attempt, :attempt_number)],
      select: map(attempt, ^execution_attempt_fields())
    )
    |> Repo.all()
    |> Enum.map(&normalize_execution_attempt_row/1)
  end

  defp approved_snapshot(change_request_id) do
    {:ok,
     from(a in "approvals",
       where:
         field(a, :change_request_id) == ^uuid_param(change_request_id) and
           field(a, :decision) == "approved",
       order_by: [asc: field(a, :reviewed_at)],
       select: map(a, [:reviewer_id, :reviewer_type, :reviewer_display])
     )
     |> Repo.all()
     |> Enum.map(fn approval ->
       %{
         "id" => approval.reviewer_id,
         "type" => approval.reviewer_type,
         "display" => approval.reviewer_display
       }
     end)}
  end

  defp emit_governance_telemetry(event, command, change_request, audit_event) do
    change_request_map = if is_map(change_request), do: change_request, else: %{}
    audit_event_map = if is_map(audit_event), do: audit_event, else: %{}

    Telemetry.execute(
      [:rulestead, :admin, :change_request, event],
      %{count: 1},
      Telemetry.metadata(
        Telemetry.governance_metadata(command, %{
          event: event,
          action: governance_action(Map.get(change_request_map, :governed_action)),
          environment_key: Map.get(change_request_map, :environment_key),
          resource_key: Map.get(change_request_map, :resource_key),
          change_request_id: Map.get(change_request_map, :id),
          correlation_id: Map.get(change_request_map, :correlation_id),
          audit_event_id: Map.get(audit_event_map, :id)
        })
      )
    )
  end

  defp insert_scheduled_execution_audit_event(
         repo,
         scheduled_execution,
         command,
         event_type,
         result,
         execution_result \\ %{}
       ) do
    %AuditEvent{}
    |> scheduled_execution_audit_changeset(
      scheduled_execution,
      command,
      event_type,
      result,
      execution_result
    )
    |> repo.insert()
  end

  defp scheduled_execution_audit_changeset(
         audit_event,
         scheduled_execution,
         command,
         event_type,
         result,
         execution_result
       ) do
    automation_meta = RolloutAutoAdvance.automation_audit_metadata(execution_result || %{})

    change_request_id =
      scheduled_execution.change_request_id || Map.get(automation_meta, "change_request_id")

    AuditEvent.changeset(audit_event, %{
      event_type: event_type,
      resource_type: scheduled_execution.resource_type || "flag",
      resource_key: scheduled_execution.resource_key,
      environment_key: scheduled_execution.environment_key,
      actor_id: actor_value(command.actor, "id"),
      actor_type: to_string(actor_value(command.actor, "type") || "system"),
      actor_display: actor_value(command.actor, "display"),
      reason: command.reason,
      result: result,
      metadata:
        AuditEvent.metadata(%{
          tenant:
            Command.GovernanceSupport.tenant_provenance(
              command,
              fallback: scheduled_execution.command_snapshot
            ),
          context:
            scheduled_execution_audit_context(scheduled_execution, command)
            |> Map.merge(automation_meta),
          request_id: scheduled_execution.correlation_id,
          source: scheduled_execution_source(scheduled_execution, command),
          change_request_id: change_request_id,
          governance_action: scheduled_execution.governed_action,
          execution_stage: scheduled_execution_stage(event_type),
          scheduled_execution_id: scheduled_execution.id,
          attempt_count: scheduled_execution.attempt_count,
          scheduled_for: scheduled_execution.scheduled_for,
          executed_at: scheduled_execution.executed_at,
          failure_reason: scheduled_execution.failure_reason,
          execution_mode: scheduled_execution.execution_mode,
          executed_by: executed_by_value(command.actor),
          scheduled_by: scheduled_by_payload(scheduled_execution),
          approved_by: normalize_array_map(scheduled_execution.approved_by_snapshot)
        }),
      correlation_id: scheduled_execution.correlation_id,
      occurred_at: now()
    })
  end

  defp scheduled_execution_audit_context(scheduled_execution, command) do
    scheduled_execution.metadata
    |> Map.new()
    |> Map.merge(Map.get(command, :metadata, %{}) |> Map.new())
    |> Map.put("scheduled_execution_id", scheduled_execution.id)
    |> Map.put("environment_key", scheduled_execution.environment_key)
    |> Map.put("governed_action", scheduled_execution.governed_action)
    |> Map.put("execution_mode", scheduled_execution.execution_mode)
  end

  defp scheduled_execution_stage("scheduled_execution.scheduled"), do: "scheduled"
  defp scheduled_execution_stage("scheduled_execution.cancelled"), do: "cancelled"
  defp scheduled_execution_stage("scheduled_execution.requeued"), do: "requeued"
  defp scheduled_execution_stage(_event_type), do: "execute"

  defp scheduled_execution_source(scheduled_execution, command) do
    command.metadata[:source] || command.metadata["source"] ||
      scheduled_execution.metadata["source"]
  end

  defp scheduled_by_payload(scheduled_execution) do
    %{
      "id" => scheduled_execution.scheduled_by_id,
      "type" => scheduled_execution.scheduled_by_type,
      "display" => scheduled_execution.scheduled_by_display
    }
  end

  defp executed_by_value(actor) when is_map(actor) do
    case actor_value(actor, "id") do
      "system:scheduler" -> "scheduler"
      "scheduler" -> "scheduler"
      _other -> "scheduler"
    end
  end

  defp executed_by_value(_actor), do: "scheduler"

  defp emit_scheduled_execution_telemetry(
         event,
         command,
         scheduled_execution,
         audit_event,
         extra \\ []
       ) do
    Telemetry.execute(
      Telemetry.scheduled_execution_event(event),
      %{count: 1},
      Telemetry.metadata(
        Telemetry.scheduled_execution_metadata(
          serialize_scheduled_execution_row(scheduled_execution),
          %{
            action: governance_action(scheduled_execution.governed_action),
            environment_key: scheduled_execution.environment_key,
            attempt_count: Keyword.get(extra, :attempt_count, scheduled_execution.attempt_count),
            audit_event_id: audit_event && audit_event.id,
            executed_by: executed_by_value(command.actor),
            event: event
          }
        )
      )
    )
  end

  defp lifecycle_telemetry_enabled?(command) do
    Map.get(
      command.metadata,
      :emit_lifecycle_telemetry,
      Map.get(command.metadata, "emit_lifecycle_telemetry", true)
    ) != false
  end

  defp normalize_governance_failure(%Rulestead.Error{} = error), do: error

  defp normalize_governance_failure(%Changeset{} = changeset),
    do: StoreError.unavailable(cause: changeset)

  defp normalize_governance_failure(other), do: StoreError.unavailable(cause: other)

  defp actor_value(nil, _key), do: nil
  defp actor_value(actor, key), do: Map.get(actor, key) || Map.get(actor, String.to_atom(key))

  defp normalize_approval_requirement_snapshot(snapshot) do
    %{
      action: governance_action(snapshot["action"] || snapshot[:action] || "manage_settings"),
      environment_key: snapshot["environment_key"] || snapshot[:environment_key],
      required_approvals: snapshot["required_approvals"] || snapshot[:required_approvals] || 0,
      change_request_required?:
        snapshot["change_request_required?"] || snapshot[:change_request_required?] || false,
      self_approval_allowed?:
        snapshot["self_approval_allowed?"] || snapshot[:self_approval_allowed?] || false
    }
  end

  defp normalize_governance_row(row) when is_map(row) do
    row
    |> maybe_normalize_uuid(:id)
    |> maybe_normalize_uuid(:change_request_id)
  end

  defp normalize_scheduled_execution_row(row) when is_map(row) do
    row
    |> maybe_normalize_uuid(:id)
    |> maybe_normalize_uuid(:change_request_id)
    |> maybe_normalize_datetime(:scheduled_for)
    |> maybe_normalize_datetime(:executed_at)
    |> maybe_normalize_datetime(:inserted_at)
    |> maybe_normalize_datetime(:updated_at)
    |> Map.update(:approved_by_snapshot, [], &normalize_array_map/1)
  end

  defp maybe_normalize_datetime(row, key) do
    case Map.get(row, key) do
      %NaiveDateTime{} = ndt -> Map.put(row, key, DateTime.from_naive!(ndt, "Etc/UTC"))
      _ -> row
    end
  end

  defp normalize_execution_attempt_row(row) when is_map(row) do
    row
    |> maybe_normalize_uuid(:id)
    |> maybe_normalize_uuid(:scheduled_execution_id)
    |> maybe_normalize_datetime(:started_at)
    |> maybe_normalize_datetime(:finished_at)
    |> maybe_normalize_datetime(:inserted_at)
  end

  defp serialize_scheduled_execution_row(row) do
    %{
      id: row.id,
      state: scheduled_state(row.state),
      action: governance_action(row.governed_action),
      change_request_id: row.change_request_id,
      environment_key: row.environment_key,
      resource_type: row.resource_type,
      resource_key: row.resource_key,
      execution_mode: scheduled_execution_mode(row.execution_mode),
      scheduled_by: %{
        "id" => row.scheduled_by_id,
        "type" => row.scheduled_by_type,
        "display" => row.scheduled_by_display
      },
      approved_by_snapshot: normalize_array_map(row.approved_by_snapshot),
      execution_metadata: row.execution_metadata || %{},
      scheduled_for: row.scheduled_for,
      executed_at: row.executed_at,
      attempt_count: row.attempt_count,
      failure_reason: row.failure_reason,
      last_oban_job_id: row.last_oban_job_id,
      correlation_id: row.correlation_id,
      idempotency_key: row.idempotency_key,
      command_snapshot: row.command_snapshot || %{},
      approval_requirement_snapshot: row.approval_requirement_snapshot || %{},
      metadata: row.metadata || %{}
    }
    |> ScheduledExecution.new()
    |> ScheduledExecution.serialize()
    |> Map.put(:id, row.id)
  end

  defp serialize_execution_attempt_row(row) do
    %{
      id: row.id,
      scheduled_execution_id: row.scheduled_execution_id,
      attempt_number: row.attempt_number,
      state: execution_attempt_state(row.state),
      started_at: row.started_at,
      finished_at: row.finished_at,
      failure_reason: row.failure_reason,
      metadata: row.metadata || %{}
    }
    |> ExecutionAttempt.new()
    |> ExecutionAttempt.serialize()
    |> Map.put(:id, row.id)
  end

  defp scheduled_execution_fields do
    [
      :id,
      :state,
      :change_request_id,
      :governed_action,
      :environment_key,
      :resource_type,
      :resource_key,
      :execution_mode,
      :scheduled_by_id,
      :scheduled_by_type,
      :scheduled_by_display,
      :approved_by_snapshot,
      :execution_metadata,
      :scheduled_for,
      :executed_at,
      :attempt_count,
      :failure_reason,
      :last_oban_job_id,
      :command_snapshot,
      :approval_requirement_snapshot,
      :metadata,
      :correlation_id,
      :idempotency_key
    ]
  end

  defp execution_attempt_fields do
    [
      :id,
      :scheduled_execution_id,
      :attempt_number,
      :state,
      :started_at,
      :finished_at,
      :failure_reason,
      :metadata
    ]
  end

  defp scheduled_state("scheduled"), do: :scheduled
  defp scheduled_state("running"), do: :running
  defp scheduled_state("completed"), do: :completed
  defp scheduled_state("failed"), do: :failed
  defp scheduled_state("quarantined"), do: :quarantined
  defp scheduled_state("cancelled"), do: :cancelled
  defp scheduled_state(_state), do: :scheduled

  defp execution_attempt_state("running"), do: :running
  defp execution_attempt_state("completed"), do: :completed
  defp execution_attempt_state("failed"), do: :failed
  defp execution_attempt_state("quarantined"), do: :quarantined
  defp execution_attempt_state("cancelled"), do: :cancelled
  defp execution_attempt_state(_state), do: :failed

  defp scheduled_execution_mode("change_request"), do: :change_request
  defp scheduled_execution_mode("policy_bypass"), do: :policy_bypass
  defp scheduled_execution_mode("emergency_bypass"), do: :emergency_bypass
  defp scheduled_execution_mode(_mode), do: :change_request

  defp ensure_scheduled_transition(state, allowed_states) do
    if state in allowed_states do
      :ok
    else
      {:error,
       StoreError.invalid_command(
         "scheduled execution is not in a valid state for this operation"
       )}
    end
  end

  defp maybe_filter_scheduled_execution(query, _field, nil), do: query

  defp maybe_filter_scheduled_execution(query, field_name, value) do
    where(
      query,
      [scheduled_execution],
      field(scheduled_execution, ^field_name) == ^to_string(value)
    )
  end

  defp maybe_filter_scheduled_state(query, nil), do: query

  defp maybe_filter_scheduled_state(query, value),
    do: where(query, [se], field(se, :state) == ^normalize_change_request_filter(value))

  defp maybe_filter_scheduled_action(query, nil), do: query

  defp maybe_filter_scheduled_action(query, value),
    do: where(query, [se], field(se, :governed_action) == ^normalize_change_request_filter(value))

  defp maybe_filter_scheduled_window(query, :after, nil), do: query

  defp maybe_filter_scheduled_window(query, :after, %DateTime{} = value),
    do: where(query, [se], field(se, :scheduled_for) >= ^value)

  defp maybe_filter_scheduled_window(query, :before, nil), do: query

  defp maybe_filter_scheduled_window(query, :before, %DateTime{} = value),
    do: where(query, [se], field(se, :scheduled_for) <= ^value)

  defp scheduled_transition_metadata(existing, state, command) do
    Map.merge(existing || %{}, %{
      "last_transition" => state,
      "last_transition_at" => DateTime.to_iso8601(now()),
      "last_actor" => command.actor || %{},
      "last_reason" => command.reason,
      "request_id" => command.metadata[:request_id] || command.metadata["request_id"]
    })
  end

  defp normalize_failure_reason(%Rulestead.Error{message: message}),
    do: normalize_failure_reason(message)

  defp normalize_failure_reason(reason) when is_binary(reason) do
    case String.trim(reason) do
      "" -> "scheduled execution failed"
      value -> value
    end
  end

  defp normalize_failure_reason(reason), do: inspect(reason)

  defp normalize_array_map(values) when is_list(values), do: Enum.map(values, &Map.new/1)
  defp normalize_array_map(_values), do: []

  defp maybe_normalize_uuid(row, key) do
    case Map.get(row, key) do
      value when is_binary(value) and byte_size(value) == 16 ->
        case Ecto.UUID.load(value) do
          {:ok, uuid} -> Map.put(row, key, uuid)
          :error -> row
        end

      _ ->
        row
    end
  end

  defp uuid_param(value) do
    case Ecto.UUID.dump(value) do
      {:ok, dumped} -> dumped
      :error -> value
    end
  end

  defp diff_map(before_state, after_state) do
    Map.new(after_state, fn {key, value} ->
      before_value = Map.get(before_state, key) || Map.get(before_state, to_string(key))
      {to_string(key), %{"from" => before_value, "to" => value}}
    end)
  end

  defp maybe_filter_audit_flag(query, nil), do: query

  defp maybe_filter_audit_flag(query, flag_key),
    do: where(query, [event], event.resource_key == ^to_string(flag_key))

  defp maybe_filter_audit_environment(query, nil), do: query

  defp maybe_filter_audit_environment(query, environment_key),
    do: where(query, [event], event.environment_key == ^to_string(environment_key))

  defp maybe_filter_audit_actor_id(query, nil), do: query

  defp maybe_filter_audit_actor_id(query, actor_id),
    do: where(query, [event], event.actor_id == ^actor_id)

  defp maybe_filter_audit_mutation(query, nil), do: query

  defp maybe_filter_audit_mutation(query, mutation),
    do: where(query, [event], event.event_type == ^mutation)

  defp maybe_filter_audit_occurred_after(query, %DateTime{} = occurred_after),
    do: where(query, [event], event.occurred_at >= ^occurred_after)

  defp maybe_filter_audit_occurred_after(query, _occurred_after), do: query

  defp maybe_filter_audit_occurred_before(query, %DateTime{} = occurred_before),
    do: where(query, [event], event.occurred_at <= ^occurred_before)

  defp maybe_filter_audit_occurred_before(query, _occurred_before), do: query

  defp active_ruleset(%{active_ruleset_id: nil}), do: nil

  defp active_ruleset(%{active_ruleset_id: active_ruleset_id}) do
    Repo.get(Ruleset, active_ruleset_id)
  end

  defp active_ruleset_version(%FlagEnvironment{active_ruleset: %Ruleset{version: version}}),
    do: version

  defp active_ruleset_version(%FlagEnvironment{}), do: nil

  defp ruleset_audit_metadata(previous_ruleset, ruleset) do
    before = ruleset_audit_state(previous_ruleset)
    after_state = ruleset_audit_state(ruleset)

    %{
      before: before,
      after: after_state,
      diff: ruleset_position_diff(before, after_state)
    }
  end

  defp ruleset_audit_state(nil), do: %{rules: []}

  defp ruleset_audit_state(ruleset) do
    %{
      rules:
        ruleset.rules
        |> Enum.with_index()
        |> Enum.map(fn {rule, position} ->
          %{key: rule.key, position: position}
        end)
    }
  end

  defp ruleset_position_diff(before_state, after_state) do
    before_positions =
      before_state
      |> normalize_ruleset_position_state()
      |> Map.get(:rules, [])
      |> Map.new(fn %{key: key, position: position} -> {key, position} end)

    %{
      rules:
        after_state
        |> normalize_ruleset_position_state()
        |> Map.get(:rules, [])
        |> Enum.map(fn %{key: key, position: position} ->
          %{key: key, from: Map.get(before_positions, key), to: position}
        end)
    }
  end

  defp normalize_ruleset_position_state(%{rules: rules}) when is_list(rules) do
    case rules do
      [%{key: _key, position: _position} | _rest] ->
        %{rules: rules}

      _other ->
        %{
          rules:
            rules
            |> Enum.with_index()
            |> Enum.map(fn {rule, position} ->
              %{key: Map.get(rule, :key) || Map.get(rule, "key"), position: position}
            end)
        }
    end
  end

  defp now, do: DateTime.utc_now() |> DateTime.truncate(:microsecond)

  defp decorate_payload(payload, flag, environment, flag_environment, lifecycle_context) do
    environment_cards = environment_cards(flag, lifecycle_context)

    payload
    |> Map.put(:lifecycle, lifecycle(flag, flag_environment, lifecycle_context))
    |> Map.put(:has_draft_ruleset?, payload.draft_rulesets != [])
    |> Map.put(:recent_owners, recent_owners(flag.ownership.owner_ref))
    |> Map.put(:environments, Enum.map(environment_cards, & &1.environment))
    |> Map.put(:environment_cards, environment_cards)
    |> Map.put(:environment_status, flag_environment.status)
    |> Map.put(:environment_key, environment.key)
  end

  defp environment_cards(flag, lifecycle_context) do
    flag.flag_environments
    |> Enum.sort_by(& &1.environment.key)
    |> Enum.map(fn flag_environment ->
      drafts = draft_ruleset_payloads(flag_environment)

      %{
        environment: environment_summary(flag_environment.environment),
        flag_environment: flag_environment_summary(flag_environment),
        active_ruleset: active_ruleset_payload(flag_environment),
        draft_rulesets: drafts,
        has_draft_ruleset?: drafts != [],
        lifecycle: lifecycle(flag, flag_environment, lifecycle_context)
      }
    end)
  end

  defp lifecycle(flag, flag_environment, lifecycle_context) do
    Lifecycle.classify(
      flag_summary(flag),
      flag_environment_summary(flag_environment),
      lifecycle_opts(flag, lifecycle_context)
    )
  end

  defp lifecycle_opts(flag, lifecycle_context) do
    evidence = Map.get(lifecycle_context, flag.key, %{})

    Application.get_env(:rulestead, :admin_lifecycle, [])
    |> Keyword.put(:code_reference_count, Map.get(evidence, :code_reference_count))
    |> Keyword.put(:code_refs_scan, Map.get(lifecycle_context, :code_refs_scan))
  end

  defp lifecycle_context_for_flags(flag_keys) do
    counts = code_reference_counts(flag_keys)
    scan = latest_code_refs_scan()

    Enum.reduce(flag_keys, %{code_refs_scan: scan}, fn flag_key, acc ->
      Map.put(acc, flag_key, %{code_reference_count: Map.get(counts, flag_key, 0)})
    end)
  end

  defp latest_code_refs_scan do
    ScanReceipt.latest_query()
    |> Repo.one()
    |> case do
      nil -> nil
      receipt -> %{received_at: receipt.received_at, reference_count: receipt.reference_count}
    end
  end

  defp code_reference_counts(flag_keys) do
    keys = flag_keys |> Enum.uniq() |> Enum.reject(&is_nil/1)

    from(code_reference in CodeReference,
      where: code_reference.flag_key in ^keys,
      group_by: code_reference.flag_key,
      select: {code_reference.flag_key, count(code_reference.id)}
    )
    |> Repo.all()
    |> Map.new()
  end

  defp recent_owners(current_owner, extra_owner \\ nil) do
    owners =
      from(flag in Flag,
        order_by: [desc: flag.updated_at],
        select: fragment("?->>'owner_ref'", flag.ownership)
      )
      |> Repo.all()

    [
      normalize_owner(current_owner),
      normalize_owner(extra_owner) | Enum.map(owners, &normalize_owner/1)
    ]
    |> Enum.reject(&is_nil/1)
    |> Enum.uniq()
    |> Enum.take(5)
  end

  defp preferred_environment(flag) do
    flag.flag_environments
    |> Enum.sort_by(fn flag_environment ->
      {flag_environment.environment.key != "test", flag_environment.environment.key}
    end)
    |> List.first()
    |> then(fn flag_environment -> {flag_environment.environment, flag_environment} end)
  end

  defp maybe_filter_owner(entries, nil), do: entries
  defp maybe_filter_owner(entries, ""), do: entries

  defp maybe_filter_owner(entries, owner) do
    normalized = normalize_owner(owner)

    Enum.filter(entries, fn entry ->
      ownership = Map.get(entry.flag, :ownership) || %{}

      normalized in [
        normalize_owner(Map.get(ownership, :owner_ref) || Map.get(ownership, "owner_ref")),
        normalize_owner(Map.get(ownership, :owner_display) || Map.get(ownership, "owner_display"))
      ]
    end)
  end

  defp maybe_filter_tags(entries, []), do: entries

  defp maybe_filter_tags(entries, tags) do
    normalized_tags = tags |> Enum.map(&normalize_tag/1) |> Enum.reject(&is_nil/1)

    Enum.filter(entries, fn entry ->
      entry_tags = entry.flag.tags |> Enum.map(&normalize_tag/1) |> Enum.reject(&is_nil/1)
      Enum.all?(normalized_tags, &(&1 in entry_tags))
    end)
  end

  defp maybe_filter_lifecycle(entries, nil), do: entries

  defp maybe_filter_lifecycle(entries, lifecycle_state) do
    Enum.filter(entries, fn entry ->
      entry.lifecycle.state == lifecycle_state
    end)
  end

  defp maybe_filter_stale(entries, nil), do: entries

  defp maybe_filter_stale(entries, stale_state) do
    Enum.filter(entries, fn entry ->
      case entry.lifecycle.freshness.state do
        :active -> stale_state == :fresh
        :potentially_stale -> stale_state == :potentially_stale
        :stale -> stale_state == :stale
        :archived -> false
      end
    end)
  end

  defp maybe_filter_readiness(entries, nil), do: entries

  defp maybe_filter_readiness(entries, readiness) do
    Enum.filter(entries, &(&1.lifecycle.archive_readiness.readiness == readiness))
  end

  defp maybe_filter_evidence_quality(entries, nil), do: entries

  defp maybe_filter_evidence_quality(entries, evidence_quality) do
    Enum.filter(entries, &(&1.lifecycle.archive_readiness.evidence_quality == evidence_quality))
  end

  defp sort_entries(entries, :inserted_at) do
    Enum.sort(entries, fn left, right ->
      compare_datetime_desc(left.flag.inserted_at, right.flag.inserted_at, left, right)
    end)
  end

  defp sort_entries(entries, :updated_at) do
    Enum.sort(entries, fn left, right ->
      compare_datetime_desc(left.flag.updated_at, right.flag.updated_at, left, right)
    end)
  end

  defp sort_entries(entries, _sort) do
    Enum.sort_by(entries, fn entry -> {entry.flag.key, entry.environment.key} end)
  end

  defp paginate_entries(entries, command) do
    filtered_entries =
      entries
      |> apply_cursor(command.after, :after, command.sort)
      |> apply_cursor(command.before, :before, command.sort)

    page_entries = Enum.take(filtered_entries, command.limit)
    first_entry = List.first(page_entries)
    last_entry = List.last(page_entries)

    %Command.Page{
      entries: page_entries,
      limit: command.limit,
      next_cursor:
        if(length(filtered_entries) > command.limit and last_entry,
          do: encode_cursor(last_entry, command.sort)
        ),
      prev_cursor:
        if((command.after || command.before) && first_entry,
          do: encode_cursor(first_entry, command.sort)
        ),
      has_next_page?: length(filtered_entries) > command.limit,
      has_previous_page?: not is_nil(command.after) or not is_nil(command.before)
    }
  end

  defp apply_cursor(entries, nil, _direction, _sort), do: entries

  defp apply_cursor(entries, cursor, direction, sort) do
    with {:ok, decoded} <- decode_cursor(cursor) do
      Enum.filter(entries, fn entry -> compare_cursor(entry, decoded, sort, direction) end)
    else
      _ -> entries
    end
  end

  defp compare_cursor(entry, decoded, :inserted_at, :after),
    do:
      {entry.flag.inserted_at, entry.flag.key, entry.environment.key} <
        {decoded.sort_value, decoded.flag_key, decoded.environment_key}

  defp compare_cursor(entry, decoded, :inserted_at, :before),
    do:
      {entry.flag.inserted_at, entry.flag.key, entry.environment.key} >
        {decoded.sort_value, decoded.flag_key, decoded.environment_key}

  defp compare_cursor(entry, decoded, :updated_at, :after),
    do:
      {entry.flag.updated_at, entry.flag.key, entry.environment.key} <
        {decoded.sort_value, decoded.flag_key, decoded.environment_key}

  defp compare_cursor(entry, decoded, :updated_at, :before),
    do:
      {entry.flag.updated_at, entry.flag.key, entry.environment.key} >
        {decoded.sort_value, decoded.flag_key, decoded.environment_key}

  defp compare_cursor(entry, decoded, _sort, :after),
    do: {entry.flag.key, entry.environment.key} > {decoded.flag_key, decoded.environment_key}

  defp compare_cursor(entry, decoded, _sort, :before),
    do: {entry.flag.key, entry.environment.key} < {decoded.flag_key, decoded.environment_key}

  defp encode_cursor(entry, sort) do
    %{
      sort: sort,
      sort_value:
        case sort do
          :inserted_at -> entry.flag.inserted_at
          :updated_at -> entry.flag.updated_at
          _ -> entry.flag.key
        end,
      flag_key: entry.flag.key,
      environment_key: entry.environment.key
    }
    |> :erlang.term_to_binary()
    |> Base.url_encode64(padding: false)
  end

  defp decode_cursor(cursor) do
    try do
      with {:ok, binary} <- Base.url_decode64(cursor, padding: false) do
        {:ok, :erlang.binary_to_term(binary)}
      end
    rescue
      _ -> :error
    end
  end

  defp maybe_filter_environment_query(query, nil), do: query
  defp maybe_filter_environment_query(query, ""), do: query

  defp maybe_filter_environment_query(query, search) do
    normalized = "%" <> String.downcase(String.trim(to_string(search))) <> "%"

    where(
      query,
      [environment],
      ilike(environment.key, ^normalized) or
        ilike(environment.name, ^normalized) or
        ilike(fragment("coalesce(?, '')", environment.description), ^normalized)
    )
  end

  defp maybe_filter_archived_audiences(query, true), do: query

  defp maybe_filter_archived_audiences(query, false) do
    where(query, [audience], is_nil(audience.archived_at))
  end

  defp maybe_filter_audience_query(query, nil), do: query
  defp maybe_filter_audience_query(query, ""), do: query

  defp maybe_filter_audience_query(query, search) do
    normalized = "%#{String.trim(search)}%"

    where(
      query,
      [audience],
      ilike(audience.key, ^normalized) or
        ilike(fragment("coalesce(?, '')", audience.description), ^normalized)
    )
  end

  defp create_environments([]) do
    environments =
      from(environment in Environment, order_by: [asc: environment.key])
      |> Repo.all()

    case environments do
      [] -> {:error, StoreError.environment_not_found(:all)}
      values -> {:ok, values}
    end
  end

  defp create_environments(environment_keys) do
    normalized_keys = environment_keys |> Enum.map(&to_string/1) |> Enum.uniq()

    environments =
      from(environment in Environment,
        where: environment.key in ^normalized_keys,
        order_by: [asc: environment.key]
      )
      |> Repo.all()

    case normalized_keys -- Enum.map(environments, & &1.key) do
      [] -> {:ok, environments}
      [missing | _] -> {:error, StoreError.environment_not_found(missing)}
    end
  end

  defp insert_flag_environments(repo, flag, environments) do
    Enum.reduce_while(environments, {:ok, []}, fn environment, {:ok, acc} ->
      attrs = %{flag_id: flag.id, environment_id: environment.id, status: :draft}

      case repo.insert(FlagEnvironment.changeset(%FlagEnvironment{}, attrs)) do
        {:ok, flag_environment} ->
          {:cont, {:ok, [flag_environment | acc]}}

        {:error, %Changeset{} = changeset} ->
          {:halt, {:error, store_changeset_error(changeset, flag.key, environment.key)}}
      end
    end)
  end

  defp normalize_owner(owner) when is_binary(owner) do
    case String.trim(owner) do
      "" -> nil
      normalized -> normalized
    end
  end

  defp normalize_owner(_owner), do: nil

  defp normalize_tag(tag) when is_binary(tag) do
    case String.trim(tag) do
      "" -> nil
      normalized -> normalized
    end
  end

  defp normalize_tag(_tag), do: nil

  defp maybe_put_update_field(attrs, _key, nil), do: attrs
  defp maybe_put_update_field(attrs, key, value), do: Map.put(attrs, key, value)

  defp maybe_put_lifecycle_update(attrs, command) do
    if not is_nil(command.lifecycle) do
      Map.put(attrs, :lifecycle, command.lifecycle)
    else
      attrs
    end
  end

  defp compare_datetime_desc(left_datetime, right_datetime, left, right) do
    case DateTime.compare(left_datetime, right_datetime) do
      :gt -> true
      :lt -> false
      :eq -> {left.flag.key, left.environment.key} <= {right.flag.key, right.environment.key}
    end
  end

  defp flag_by_key_query(flag_key) do
    from(flag in Flag,
      where: flag.key == ^to_string(flag_key),
      preload: [flag_environments: [:environment, :active_ruleset]]
    )
  end

  defp flag_with_environment_query(flag_key, environment_key) do
    from(flag in Flag,
      where: flag.key == ^to_string(flag_key),
      join: fe in assoc(flag, :flag_environments),
      on: fe.flag_id == flag.id,
      join: env in assoc(fe, :environment),
      on: env.id == fe.environment_id and env.key == ^to_string(environment_key),
      preload: [flag_environments: {fe, [:environment, :active_ruleset]}]
    )
  end

  defp environment_snapshot_flags_query(environment_key) do
    from(flag in Flag,
      where: is_nil(flag.archived_at),
      join: fe in assoc(flag, :flag_environments),
      on:
        fe.flag_id == flag.id and fe.status in [:active, :killswitched] and
          not is_nil(fe.active_ruleset_id),
      join: env in assoc(fe, :environment),
      on: env.id == fe.environment_id and env.key == ^to_string(environment_key),
      order_by: [asc: flag.key],
      preload: [flag_environments: {fe, [:environment, :active_ruleset]}]
    )
  end

  defp runtime_snapshot_query(%Command.FetchSnapshot{version: nil}, environment_key) do
    from(snapshot in RuntimeSnapshot,
      where: snapshot.environment_key == ^environment_key,
      order_by: [desc: snapshot.version],
      limit: 1
    )
  end

  defp runtime_snapshot_query(%Command.FetchSnapshot{version: version}, environment_key) do
    from(snapshot in RuntimeSnapshot,
      where: snapshot.environment_key == ^environment_key and snapshot.version == ^version,
      limit: 1
    )
  end

  defp snapshot_lookup_metadata(environment_key, nil), do: %{environment_key: environment_key}

  defp snapshot_lookup_metadata(environment_key, version) do
    %{environment_key: environment_key, version: version}
  end

  defp runtime_ruleset_payload(nil, _flag_environment), do: nil

  defp runtime_ruleset_payload(ruleset, %{status: :killswitched}) do
    %{ruleset | rules: []}
  end

  defp runtime_ruleset_payload(ruleset, _flag_environment), do: ruleset

  defp webhook_receipt_fields do
    [
      :id,
      :provider,
      :endpoint_key,
      :delivery_id,
      :attempt_id,
      :topic,
      :occurred_at,
      :received_at,
      :raw_body_sha256,
      :verification_metadata,
      :normalized_payload,
      :dedupe_key,
      :verified_state,
      :rejection_reason,
      :correlation_id,
      :change_request_id,
      :scheduled_execution_id,
      :inserted_at,
      :updated_at
    ]
  end

  defp normalize_webhook_receipt_row(row) when is_map(row) do
    row
    |> maybe_normalize_uuid(:id)
    |> maybe_normalize_uuid(:change_request_id)
    |> maybe_normalize_uuid(:scheduled_execution_id)
  end

  defp serialize_webhook_receipt_row(row) do
    %{
      id: row.id,
      provider: row.provider,
      endpoint_key: row.endpoint_key,
      delivery_id: row.delivery_id,
      attempt_id: row.attempt_id,
      topic: row.topic,
      occurred_at: row.occurred_at,
      received_at: row.received_at,
      raw_body_sha256: row.raw_body_sha256,
      verification_metadata: row.verification_metadata || %{},
      normalized_payload: row.normalized_payload || %{},
      dedupe_key: row.dedupe_key,
      verified_state: String.to_existing_atom(row.verified_state),
      rejection_reason: row.rejection_reason,
      correlation_id: row.correlation_id,
      change_request_id: row.change_request_id,
      scheduled_execution_id: row.scheduled_execution_id,
      inserted_at: row.inserted_at,
      updated_at: row.updated_at
    }
  end

  defp maybe_filter_webhook_record(query, _field, nil), do: query

  defp maybe_filter_webhook_record(query, field_name, value) do
    where(query, [wr], field(wr, ^field_name) == ^to_string(value))
  end

  defp normalize_webhook_state_filter(nil), do: nil
  defp normalize_webhook_state_filter(value) when is_atom(value), do: Atom.to_string(value)
  defp normalize_webhook_state_filter(value) when is_binary(value), do: String.trim(value)

  defp fetch_destination_by_id(id) do
    case Repo.get(Destination, id) do
      nil -> {:error, StoreError.invalid_command("webhook destination was not found")}
      destination -> {:ok, destination}
    end
  end

  defp serialize_webhook_destination(destination) do
    %{
      id: destination.id,
      name: destination.name,
      description: destination.description,
      url: destination.url,
      secret_id: destination.secret_id,
      environment_key: destination.environment_key,
      subscriptions: destination.subscriptions || [],
      enabled: destination.enabled,
      metadata: destination.metadata || %{},
      inserted_at: destination.inserted_at,
      updated_at: destination.updated_at
    }
  end

  defp serialize_webhook_delivery(delivery) do
    %{
      id: delivery.id,
      destination_id: delivery.webhook_destination_id,
      event_id: delivery.webhook_outbound_event_id,
      state: delivery.state,
      attempt_count: delivery.attempt_count,
      last_attempt_at: delivery.last_attempt_at,
      next_attempt_at: delivery.next_attempt_at,
      terminal_failure_reason: delivery.terminal_failure_reason,
      last_response_code: delivery.last_response_code,
      last_response_body: delivery.last_response_body,
      inserted_at: delivery.inserted_at,
      updated_at: delivery.updated_at,
      # Preloaded associations
      event: maybe_serialize_outbound_event(delivery.webhook_outbound_event),
      destination: maybe_serialize_destination_summary(delivery.webhook_destination)
    }
  end

  defp maybe_serialize_outbound_event(%Ecto.Association.NotLoaded{}), do: nil
  defp maybe_serialize_outbound_event(nil), do: nil

  defp maybe_serialize_outbound_event(event) do
    %{
      id: event.id,
      event_type: event.event_type,
      payload: event.payload,
      resource_type: event.resource_type,
      resource_key: event.resource_key,
      environment_key: event.environment_key,
      correlation_id: event.correlation_id,
      inserted_at: event.inserted_at
    }
  end

  defp maybe_serialize_destination_summary(%Ecto.Association.NotLoaded{}), do: nil
  defp maybe_serialize_destination_summary(nil), do: nil

  defp maybe_serialize_destination_summary(destination) do
    %{
      id: destination.id,
      name: destination.name,
      url: destination.url
    }
  end

  defp maybe_filter_destination(query, _field, nil), do: query

  defp maybe_filter_destination(query, field_name, value) do
    where(query, [d], field(d, ^field_name) == ^to_string(value))
  end

  defp maybe_filter_delivery(query, _field, nil), do: query

  defp maybe_filter_delivery(query, field_name, value) do
    where(query, [d], field(d, ^field_name) == ^value)
  end

  defp enqueue_webhook_deliveries(multi, event_type, payload_fn, opts) do
    env_key = Keyword.get(opts, :environment_key)
    resource_type = Keyword.get(opts, :resource_type)
    resource_key = Keyword.get(opts, :resource_key)

    Multi.run(multi, :"outbound_webhooks_#{event_type}", fn repo, changes ->
      destinations =
        repo.all(
          from(d in Rulestead.Webhooks.Destination,
            where:
              d.enabled == true and ^event_type in d.subscriptions and
                (is_nil(d.environment_key) or d.environment_key == ^env_key)
          )
        )

      if Enum.empty?(destinations) do
        {:ok, []}
      else
        correlation_id = get_correlation_id(changes) || Ecto.UUID.generate()
        payload = payload_fn.(changes)

        {:ok, event} =
          Rulestead.Webhooks.OutboundEvent.changeset(%Rulestead.Webhooks.OutboundEvent{}, %{
            event_type: event_type,
            payload: payload,
            resource_type: resource_type,
            resource_key: resource_key,
            environment_key: env_key,
            correlation_id: correlation_id
          })
          |> repo.insert()

        deliveries =
          Enum.map(destinations, fn dest ->
            {:ok, delivery} =
              Rulestead.Webhooks.Delivery.changeset(%Rulestead.Webhooks.Delivery{}, %{
                webhook_destination_id: dest.id,
                webhook_outbound_event_id: event.id,
                state: :pending,
                attempt_count: 0
              })
              |> repo.insert()

            job = Rulestead.Oban.webhook_delivery_job(delivery.id)
            repo.insert_all("oban_jobs", [job], returning: [:id])

            delivery
          end)

        {:ok, deliveries}
      end
    end)
  end

  @doc false
  def requeue_webhook_delivery(delivery, schedule_in \\ 0) do
    job = Rulestead.Oban.webhook_delivery_job(delivery.id, schedule_in: schedule_in)
    Repo.insert_all("oban_jobs", [job], returning: [:id])
    :ok
  end

  defp get_correlation_id(%{audit_event: audit_event}), do: audit_event.correlation_id
  defp get_correlation_id(%{change_request: cr}), do: cr.correlation_id
  defp get_correlation_id(_), do: nil
end