lib/ash/notifier/pub_sub/pub_sub.ex

defmodule Ash.Notifier.PubSub do
  require Logger

  @publish %Spark.Dsl.Entity{
    name: :publish,
    target: Ash.Notifier.PubSub.Publication,
    describe: "Configure a given action to publish its results over a given topic.",
    examples: [
      "publish :create, \"created\"",
      """
      publish :assign, "assigned"
      """
    ],
    schema: Ash.Notifier.PubSub.Publication.schema(),
    args: [:action, :topic]
  }

  @publish_all %Spark.Dsl.Entity{
    name: :publish_all,
    target: Ash.Notifier.PubSub.Publication,
    describe: """
    Works the same as `publish`, except that it takes a type and publishes all actions of that type.
    """,
    examples: [
      "publish_all :create, \"created\""
    ],
    schema: Ash.Notifier.PubSub.Publication.publish_all_schema(),
    args: [:type, :topic]
  }

  @pub_sub %Spark.Dsl.Section{
    name: :pub_sub,
    describe: """
    A section for configuring how resource actions are published over pubsub
    """,
    examples: [
      """
      pub_sub do
        module MyEndpoint
        prefix "post"

        publish :destroy, ["destroyed", :id]
        publish :update, ["updated", :name], event: "name_change"
        publish_all :create, "created"
      end
      """
    ],
    entities: [
      @publish,
      @publish_all
    ],
    no_depend_modules: [:module],
    schema: [
      module: [
        type: :atom,
        doc: "The module to call `broadcast/3` on e.g module.broadcast(topic, event, message).",
        required: true
      ],
      prefix: [
        type: :string,
        doc:
          "A prefix for all pubsub messages, e.g `users`. A message with `created` would be published as `users:created`"
      ],
      delimiter: [
        type: :string,
        doc: "A delimiter for building topics. Default is a colon (:)"
      ],
      filter: [
        type: {:fun, 1},
        doc:
          "A filter for notifications. Receives a notification, and ignores it if the function returns a falsy value. Both this and filters on specific publications must return a truthy value for a notification to be emitted."
      ],
      transform: [
        type: {:fun, 1},
        doc:
          "A transformer for notifications. Specific transformers on each publication *override* this option"
      ],
      broadcast_type: [
        type: {:one_of, [:notification, :phoenix_broadcast, :broadcast]},
        default: :notification,
        doc: """
        What shape the event payloads will be in. See
        """
      ],
      name: [
        type: :atom,
        doc: "A named pub sub to pass as the first argument to broadcast."
      ]
    ]
  }

  @sections [@pub_sub]

  @moduledoc """
  A builtin notifier to help you publish events over any kind of pub-sub tooling.

  This is plug and play with `Phoenix.PubSub`, but could be used with any pubsub system.

  You configure a module that defines a `broadcast/3` function, and then add some "publications"
  which configure under what conditions an event should be sent and what the topic should be.

  ## Example

  ```elixir
  defmodule MyApp.User do
    use Ash.Resource,
      # ...
      notifiers: [Ash.Notifier.PubSub]

    # ...

    pub_sub do
      module MyAppWeb.Endpoint

      prefix "user"
      publish :update, ["updated", :_pkey]
    end
  end
  ```

  ## Debugging PubSub

  It can be quite frustrating when setting up pub_sub when everything appears to be set up properly, but
  you aren't receiving events. This usually means some kind of mismatch between the event names produced
  by the resource/config of your publications, and you can use the following flag to display debug
  information about all pub sub events.

  ```elixir
  config :ash, :pub_sub, debug?: true
  ```

  ## Topic Templates

  Often you want to include some piece of data in the thing being changed, like the `:id` attribute. This
  is done by providing a list as the topic, and using atoms which will be replaced by their corresponding
  values. They will ultimately be joined with `:`.

  For example:

  ```elixir
  prefix "user"

  publish :create, ["created", :user_id]
  ```

  This might publish a message to "user:created:1" for example.

  For updates, if the field in the template is being changed, a message is sent
  to *both* values. So if you change `user 1` to `user 2`, the same message would
  be published to `user:updated:1` and `user:updated:2`. If there are multiple
  attributes in the template, and they are all being changed, a message is sent for
  every combination of substitutions.

  ## Important

  If the previous value was `nil` or the field was not selected on the data passed into the action, then a
  notification is not sent for the previous value.

  If the new value is `nil` then a notification is not sent for the new value.

  ## Template parts

  Templates may contain lists, in which case all combinations of values in the list will be used. Add
  `nil` to the list if you want to produce a pattern where that entry is omitted.

  The atom `:_tenant` may be used. If the changeset has a tenant set on it, that
  value will be used, otherwise that combination of values is ignored.

  The atom `:_pkey` may be used. It will be a stringified, concatenation of the primary key fields,
  or just the primary key if there is only one primary key field.

  The atom `nil` may be used. It only makes sense to use it in the context of a list of alternatives,
  and adds a pattern where that part is skipped.

  ```elixir
  publish :updated, [[:team_id, :_tenant], "updated", [:id, nil]]
  ```

  Would produce the following messages, given a `team_id` of 1, a `tenant` of `org_1`, and an `id` of `50`:

  ```elixir
  "1:updated:50"
  "1:updated"
  "org_1:updated:50"
  "org_1:updated"
  ```

  ## Custom Delimiters

  It's possible to change the default delimiter used when generating topics. This is useful when working with message brokers
  like RabbitMQ, which rely on a different set of delimiters for routing.


  ```elixir
  pub_sub do
    delimiter "."
  end
  ```

  ## Named Pubsub modules

  If you are using a phoenix `Endpoint` module for pubsub then this is unnecessary. If you want to use a custom pub sub started
  with something like `{Phoenix.PubSub, name: MyName}`, then you can provide `MyName` to here.

  ## Broadcast Types

  Configured with `broadcast_type`.

  - `:notification` just sends the notification
  - `:phoenix_broadcast` sends a `%Phoenix.Socket.Broadcast{}` (see above)
  - `:broadcast` sends `%{topic: (topic), event: (event), payload: (notification)}`
  """

  use Spark.Dsl.Extension,
    sections: @sections,
    verifiers: [Ash.Notifier.PubSub.Verifiers.VerifyActionNames]

  use Ash.Notifier

  alias Ash.Notifier.PubSub.Info

  @doc false
  def notify(%Ash.Notifier.Notification{resource: resource} = notification) do
    filter = Ash.Notifier.PubSub.Info.filter(resource)

    resource
    |> Ash.Notifier.PubSub.Info.publications()
    |> Enum.filter(
      &(matches?(&1, notification.action) &&
          (is_nil(&1.filter) || &1.filter.(notification)) &&
          (is_nil(filter) || filter.(notification)))
    )
    |> Enum.each(&publish_notification(&1, notification))
  end

  @doc false
  def requires_original_data?(resource, action) do
    resource
    |> Ash.Notifier.PubSub.Info.publications()
    |> Enum.filter(&(&1.previous_values? && matches?(&1, action)))
    |> Enum.flat_map(fn publish ->
      publish.topic
      |> List.flatten()
      |> Enum.filter(&is_atom/1)
    end)
    |> Enum.any?(&Ash.Resource.Info.attribute(resource, &1))
  end

  defp publish_notification(publish, notification) do
    debug? = Application.get_env(:ash, :pub_sub)[:debug?] || false
    event = publish.event || to_string(notification.action.name)
    prefix = Ash.Notifier.PubSub.Info.prefix(notification.resource) || ""
    delimiter = Info.delimiter(notification.resource)

    topics =
      publish.topic
      |> fill_template(notification, delimiter, publish.previous_values?)
      |> Enum.map(fn topic ->
        case {prefix, topic} do
          {"", ""} -> ""
          {prefix, ""} -> prefix
          {"", topic} -> topic
          {prefix, topic} -> "#{prefix}#{delimiter}#{topic}"
        end
      end)

    if debug? do
      Logger.debug("""
      Broadcasting to topics #{inspect(topics)} via #{inspect(Ash.Notifier.PubSub.Info.module(notification.resource))}.broadcast

      Notification:

      #{inspect(notification)}
      """)
    end

    transform = publish.transform || Ash.Notifier.PubSub.Info.transform(notification.resource)

    value =
      if transform do
        transform.(notification)
      else
        notification
      end

    Enum.each(topics, fn topic ->
      args =
        case Ash.Notifier.PubSub.Info.name(notification.resource) do
          nil ->
            [topic, event, to_payload(topic, event, notification, value)]

          pub_sub ->
            payload = to_payload(topic, event, notification, value)

            [pub_sub, topic, payload]
        end

      args =
        case publish.dispatcher do
          nil ->
            args

          dispatcher ->
            args ++ dispatcher
        end

      apply(Ash.Notifier.PubSub.Info.module(notification.resource), :broadcast, args)
    end)
  end

  def to_payload(topic, event, notification, value) do
    case Ash.Notifier.PubSub.Info.broadcast_type(notification.resource) do
      :phoenix_broadcast ->
        %{
          __struct__: Phoenix.Socket.Broadcast,
          topic: topic,
          event: event,
          payload: value
        }

      :broadcast ->
        %{
          topic: topic,
          event: event,
          payload: value
        }

      :notification ->
        value
    end
  end

  defp fill_template(topic, _notification, _delimiter, _previous_values?) when is_binary(topic),
    do: [topic]

  defp fill_template(topic, notification, delimiter, previous_values?) do
    topic
    |> all_combinations_of_values(notification, notification.action.type, previous_values?)
    |> Enum.map(&List.flatten/1)
    |> Enum.map(&Enum.join(&1, delimiter))
    |> Enum.uniq()
  end

  defp all_combinations_of_values(
         items,
         notification,
         action_type,
         _previous_values?,
         trail \\ []
       )

  defp all_combinations_of_values([], _, _, _previous_values?, trail), do: [Enum.reverse(trail)]

  defp all_combinations_of_values(
         [nil | rest],
         notification,
         action_type,
         previous_values?,
         trail
       ) do
    all_combinations_of_values(rest, notification, action_type, previous_values?, trail)
  end

  defp all_combinations_of_values(
         [item | rest],
         notification,
         action_type,
         previous_values?,
         trail
       )
       when is_binary(item) do
    all_combinations_of_values(rest, notification, action_type, previous_values?, [item | trail])
  end

  defp all_combinations_of_values(
         [:_tenant | rest],
         notification,
         action_type,
         previous_values?,
         trail
       ) do
    if notification.changeset.to_tenant do
      all_combinations_of_values(rest, notification, action_type, previous_values?, [
        notification.changeset.to_tenant | trail
      ])
    else
      []
    end
  end

  defp all_combinations_of_values([:_pkey | rest], notification, type, true, trail)
       when type in [:update, :destroy] do
    pkey = Ash.Resource.Info.primary_key(notification.changeset.resource)
    pkey_value_before_change = Enum.map_join(pkey, "-", &Map.get(notification.changeset.data, &1))
    pkey_value_after_change = Enum.map_join(pkey, "-", &Map.get(notification.data, &1))

    [pkey_value_before_change, pkey_value_after_change]
    |> Enum.uniq()
    |> Enum.flat_map(fn possible_value ->
      all_combinations_of_values(rest, notification, type, true, [
        possible_value | trail
      ])
    end)
  end

  defp all_combinations_of_values([:_pkey | rest], notification, type, _, trail)
       when type in [:update, :destroy] do
    pkey = Ash.Resource.Info.primary_key(notification.changeset.resource)
    pkey_value_after_change = Enum.map_join(pkey, "-", &Map.get(notification.data, &1))

    all_combinations_of_values(rest, notification, type, false, [pkey_value_after_change | trail])
  end

  defp all_combinations_of_values(
         [:_pkey | rest],
         notification,
         action_type,
         previous_values?,
         trail
       ) do
    pkey = Ash.Resource.Info.primary_key(notification.changeset.resource)

    all_combinations_of_values(rest, notification, action_type, previous_values?, [
      Enum.map_join(pkey, "-", &Map.get(notification.data, &1)) | trail
    ])
  end

  defp all_combinations_of_values([item | rest], notification, type, true, trail)
       when is_atom(item) and type in [:update, :destroy] do
    value_before_change = Map.get(notification.changeset.data, item)
    value_after_change = Map.get(notification.data, item)

    [value_before_change, value_after_change]
    |> Enum.filter(&publishable_value?(&1, notification))
    |> Enum.uniq()
    |> Enum.flat_map(fn possible_value ->
      all_combinations_of_values(rest, notification, type, true, [possible_value | trail])
    end)
  end

  defp all_combinations_of_values([item | rest], notification, type, _, trail)
       when is_atom(item) and type in [:update, :destroy] do
    value_after_change = Map.get(notification.data, item)

    if publishable_value?(value_after_change, notification) do
      all_combinations_of_values(rest, notification, type, false, [value_after_change | trail])
    else
      []
    end
  end

  defp all_combinations_of_values(
         [item | rest],
         notification,
         action_type,
         previous_values?,
         trail
       )
       when is_atom(item) do
    value = Map.get(notification.data, item)

    if publishable_value?(value, notification) do
      all_combinations_of_values(rest, notification, action_type, previous_values?, [
        value | trail
      ])
    else
      []
    end
  end

  defp all_combinations_of_values(
         [item | rest],
         notification,
         action_type,
         previous_values?,
         trail
       )
       when is_list(item) do
    Enum.flat_map(item, fn possible_value ->
      all_combinations_of_values(
        [possible_value | rest],
        notification,
        action_type,
        previous_values?,
        trail
      )
    end)
  end

  defp publishable_value?(nil, _notification), do: false

  defp publishable_value?(
         %Ash.NotLoaded{field: field},
         %{topic: topic, resource: resource}
       ) do
    Logger.warning(
      "Not publishing notification `#{inspect(topic)}` for #{inspect(resource)} because `#{field}` is not loaded"
    )

    false
  end

  defp publishable_value?(%Ash.NotLoaded{field: field}, notification) do
    Logger.warning(
      "Not publishing notification for #{inspect(notification.resource)} because `#{field}` is not loaded"
    )

    false
  end

  defp publishable_value?(
         %Ash.ForbiddenField{field: field},
         %{topic: topic, resource: resource}
       ) do
    Logger.warning(
      "Not publishing notification `#{inspect(topic)}` for #{inspect(resource)} because `#{field}` is an `%Ash.ForbiddenField{}`"
    )

    false
  end

  defp publishable_value?(%Ash.ForbiddenField{field: field}, notification) do
    Logger.warning(
      "Not publishing notification for #{inspect(notification.resource)} because `#{field}` is an `%Ash.ForbiddenField{}`"
    )

    false
  end

  defp publishable_value?(_, _), do: true

  defp matches?(%{action: action}, %{name: action}), do: true

  defp matches?(%{type: type, except: except}, %{type: type, name: action}) do
    action not in except
  end

  defp matches?(_, _), do: false
end