lib/trolleybus.ex

defmodule Trolleybus do
  @moduledoc """
  Defines a local, application-level PubSub API for dispatching side effects.

  This PubSub mechanism is dedicated to handling side-effects in business logic.
  Instead of calling side effects directly, an event is published. The event is
  then routed to one or more handlers, according to declared routing.

  ## Example

  Let's assume we have a code path in business logic where we want to trigger
  two side effects after core logic is done.

      def accept_invite(...) do
        ...
        App.Emails.notify_invite_accepted(document, inviter, user)
        App.Webhooks.notify_auth_update(document, user)
        ...
      end

  First, we define the event using `Trolleybus.Event`:

      defmodule App.Documents.Events.MembershipInviteAccepted do
        use Trolleybus.Event

        handler App.Documents.EmailEventHandler
        handler App.Webhooks.EventHandler

        message do
          field :membership, %App.Memberships.Membership{}
          field :document, %App.Documents.Document{}
          field :inviter, %App.Users.User{}, required: false
          field :user, %App.Users.User{}
        end
      end

  Next, we define new or extend existing event handlers using
  `Trolleybus.Handler`:

      defmodule App.Documents.EmailEventHandler do
        use Trolleybus.Handler

        alias App.Documents.Events.MembershipInviteAccepted

        def handle_event(%MembershipInviteAccepted{
          document: document,
          inviter: inviter,
          user: user
        }) do
          App.Emails.notify_invite_accepted(document, inviter, user)
        end
        ...
      end

      defmodule App.Webhooks.EventHandler do
        use Trolleybus.Handler

        alias App.Documents.Events.MembershipInviteAccepted

        def handle_event(%MembershipInviteAccepted{
          document: document,
          user: user
        }) do
          App.Webhooks.notify_auth_update(document, user)
        end
        ...
      end

  Finally, we publish an event instead of calling `Emails` and `Webhooks`
  directly:

      alias App.Documents.Events

      def accept_invite(...) do
        ...
        Trolleybus.publish(%Events.MembershipInviteAccepted{
          membership: membership,
          document: document,
          inviter: inviter,
          user: user
        })
        ...
      end

  ## Publishing

  A `publish/2` call triggers the dispatch logic for the event. The dispatch
  retrieves all handlers provided in event's definition and passes it to
  those handlers.

  Publishing may be executed in three modes:

    * fully synchronous `:full_sync` (default) - executes all handlers
      sequentially and synchronously, within the same process as caller,
    * asynchronous `:async` - executes side effects in separate processes
      not waiting for them to finish executing,
    * synchronous `:sync` - blocks until all processes executing event
      handlers complete.

  The wait time in case of synchronous publish is limited by timeout setup via
  `:sync_timeout` option, expressed in milliseconds (defaults to 5000). Any
  handler execution process running for longer than the setup timeout is killed.

  For simplicity, `Trolleybus` is neither concerned with persistence nor retrying
  failed handler calls. If a particular case calls for this kind of guarantees,
  it can be realised by, for instance, making relevant handler use job processing
  library like [Oban](https://hexdocs.pm/oban/Oban.html).

  ## Buffering

  There are cases where publishing events may have to be either deferred or
  abandoned completely. This problem can be solved by using one of buffering
  wrappers: `transaction/1`, `buffered/1` or `muffled/1`. The underlying
  mechanism is the same for all three. Before a block of code is executed, a
  buffer is opened. The buffer is an agent process storing a list of events
  along with their respective publish options, if any are passed. Each call to
  `publish/2` inside that block puts the event in that list instead of actually
  publishing it. Once the block of code finishes executing, further behavior
  depends on the wrapper. `buffered/1` and `muffled/1` discard the events (the
  former returns them too, along with the result). `transaction/1` fetches
  contents of the current buffer, closes it and re-publishes all the events from
  the list. If wrappers are nested, the re-publishing will result by putting the
  events in another buffer opened by the outer wrapper. Eventually, the events
  get either dispatched to respective handlers or discarded.

  Current buffer as well as buffer stack are tracked using process dictionary.
  This means that buffering will only work for code executed within a single
  process.

  ## Events inside transactions

  In case of more complex logic, there can be multiple events published along a
  single code path. This code path can in turn contain multiple subroutines
  changing system state, wrapped in one big transaction. It's often hard to
  defer all the side effects outside of that transaction without clunky
  workarounds. That's where `transaction/1` comes into play. It's especially
  useful when events are published inside a large, multi-stage `Ecto.Multi` based
  transaction.

  Assuming we have events published somewhere inside a big transaction, like
  this:

      def update_memberships(memberships) do
        Ecto.Multi.run(:updated_memberships,
          fn _, %{old_memberships: memberships} ->
            updated_memberships = Enum.map(memberships, fn membership ->
              Trolleybus.publish(%MembershipChanged{...})

              ...
            end)

            {:ok, updated_memberships}
          end)
      end

  we can defer actual publishing of all events until after the transaction is
  executed by wrapping the top level `Ecto` transaction with `Trolleybus`
  transaction:

      def process(multi) do
        case Trolleybus.transaction(fn -> Repo.transaction(multi) end) do
          {:ok, ...} ->
            ...
        end
      end

  ## Nesting transactions

  Buffered transactions wrapped with `transaction/1` can be arbitrarily nested
  and it's guaranteed that only the outermost one will publish the events:

      Trolleybus.transaction(fn -> # all events will be published
                                   # only after this outer block finishes
        ...
        Trolleybus.transaction(fn ->
          ...
        end)
        ...
      end)

  ## Reusing code publishing events

  We often want to reuse existing logic as a part of larger, more complex
  routines. The problem is that this existing logic may already publish events
  specific to that original context. It may often be undesirable, because we
  want to emit events specific to the wrapping routine and need existing logic
  only for its data manipulation part. Another use case where that may be an
  issue is reusing business logic for setting up system state in tests instead
  of synthetic factories. In order to make it possible, we can wrap the reused
  code with `muffled/1`:

      {:ok, %{membership: accepted_membership}} = Trolleybus.muffled(fn ->
        AcceptInvite.accept_membership_invite(user, membership)
      end)

  Wrapping it this way will send all the events to a buffer which will be
  promptly discarded after the code block completes.

  ## Testing events

  When we want to test what events does a given piece of logic publish
  instead of relying on checking side effects, we can use `buffered/1`:

      {{:ok, %{membership}}, events} = Trolleybus.buffered(fn ->
        AcceptInvite.accept_membership_invite(user, membership)
      end)

      assert [{%MembershipInviteAccepted{}, []}] = events

  Events published inside `buffered/1` are stored in a buffer, whose contents
  are returned along with the result of the code block in a tuple. The events
  are then promptly discarded, so no handler gets triggered.

  Another potentially handy function is `get_buffer/0`, which allows to "peek"
  into contents of the buffer at any point when buffer is open. It can be used
  inside any of `buffered/1`, `muffled/1` or `transaction/1` blocks:

      Trolleybus.muffled(fn ->
        ...

        assert [{%SomeEvent{}, []}] = get_buffer()

        ...
      end)

  One important thing to note is that publishing mode can be overridden in all
  calls using Application configuration:

      config :trolleybus, mode_override: :full_sync

  This allows users to force all events to be published using a given mode, for
  example `:full_sync` in `config/test.exs` to make testing side effects
  simpler. This lets us avoiding any issues related to running handlers
  in a separate process, like having to explicitly handle `Ecto` sandbox
  allowance.

  ## Listing routes

  In order to print all events and associated handlers in the project,
  a dedicated mix task can be run:

      mix trolleybus.routes

  The output has a following form:

      * App.Events.DocumentTransferred
          => App.Webhooks.EventHandler
          => App.Memberships.EmailEventHandler

      * App.Events.UserInvitedToDocument
          => App.Memberships.EmailEventHandler

      ...
  """

  alias Trolleybus.TaskSupervisor

  require Logger

  @type publish_mode() :: :full_sync | :async | :sync

  @type publish_option() :: {:mode, publish_mode()} | {:sync_timeout, non_neg_integer()}

  @name_field :bus_current_buffer
  @stack_field :bus_stack

  @doc """
  Publishes event.

  ## Example

      :ok = Trolleybus.publish(%SomeEvent{name: "value"})
      :ok = Trolleybus.publish(%OtherEvent{flag: true}, mode: :async)
      :ok = Trolleybus.publish(
        %AnotherEvent{number: 123}, mode: :sync, sync_timeout: 1_000
      )

  ## Options

    * `:mode` - Event dispatch mode. Can be one of `:full_sync`, `:async`
      or `:sync`. See "Dispatch modes" below for detailed explanation.
      Default: `:full_sync`.
    * `:sync_timeout` - Timeout in milliseconds after which synchronous
      dispatch is cancelled and handler processes still in progress are
      killed. Works only for `:sync` mode. Default: 5000

  ## Disptach modes

  Publishing may be executed in three modes:

    * `:full_sync` - Executes all handlers sequentially and synchronously,
      within the same process as caller.
    * `:async` - Executes handlers in separate processes fully asynchronously,
      not waiting for them to finish executing.
    * `:sync` - Executes handlers in separate processes in parallel and waits
      until they complete executing. Waiting time is determined by
      `:sync_timeout` option. Handler processes running past timeout are killed.
  """
  @spec publish(struct(), [publish_option()]) :: :ok
  def publish(event, opts \\ []) do
    %event_module{} = event

    event = event_module.cast!(event)

    case current() do
      {:buffer, pid} ->
        publish_to_buffer(pid, event, opts)

      __MODULE__ ->
        publish_directly(event, opts)
    end
  end

  @doc """
  Returns events published in the given function.

  The events are returned along with the result of the function, without
  dispatching them to defined handlers.

  Order of events in the list is always consistent with order of publishing
  inside the wrapped function.

  ## Example

      {"result", [{%SomeEvent{}, []},
                  {%OtherEvent{}, mode: :async},
                  {%AnotherEvent{}, mode: :sync, sync_timeout: 1_000}]} =
        Trolleybus.buffered(fn ->
          Trolleybus.publish(%SomeEvent{name: "value"})
          Trolleybus.publish(%OtherEvent{flag: true}, mode: :async)
          Trolleybus.publish(%AnotherEvent{number: 123}, mode: :sync, sync_timeout: 1_000)

          "result"
        end)
  """
  @spec buffered((() -> result)) :: {result, [{struct(), [publish_option()]}]}
        when result: term()
  def buffered(fun) when is_function(fun, 0) do
    buffer = open_buffer()

    try do
      result = fun.()
      {result, get_buffer()}
    after
      if current() == buffer do
        close_buffer()
      end
    end
  end

  @doc """
  Discards events published in the given function without dispatching them
  to defined handlers.

  ## Example

      "result" =
        Trolleybus.muffled(fn ->
          Trolleybus.publish(%SomeEvent{name: "value"})

          "result"
        end)
  """
  @spec muffled((() -> result)) :: result when result: term()
  def muffled(fun) when is_function(fun, 0) do
    buffer = open_buffer()

    try do
      fun.()
    after
      if current() == buffer do
        close_buffer()
      end
    end
  end

  @doc """
  Lists all published but not yet dispatched events so far.

  Order of events in the list is always consistent with order of publishing
  inside the wrapped function.

  ## Example

      Trolleybus.muffled(fn ->
        Trolleybus.publish(%SomeEvent{name: "value"})
        Trolleybus.publish(%OtherEvent{flag: true}, mode: :async)

        [{%SomeEvent{}, []},
         {%OtherEvent{}, mode: :async}] = Trolleybus.get_buffer()

        Trolleybus.publish(%AnotherEvent{number: 123}, mode: :sync, sync_timeout: 1_000)

        [{%SomeEvent{}, []},
         {%OtherEvent{}, mode: :async},
         {%AnotherEvent{}, mode: :sync, sync_timeout: 1_000}] = Trolleybus.get_buffer()
      end)
  """
  @spec get_buffer() :: [{struct(), publish_option()}]
  def get_buffer() do
    case current() do
      {:buffer, pid} ->
        Agent.get(pid, fn buffer -> Enum.reverse(buffer) end)

      __MODULE__ ->
        raise "No buffer to get."
    end
  end

  @doc """
  Dispatches published events after running the given function.

  Dispatching to event handlers is done only if the result is in
  `{:ok, ...}` tuple format. Otherwise, events are discarded.

  ## Example

      {:ok, "result"} =
        Trolleybus.transaction(fn ->
          Trolleybus.publish(%SomeEvent{name: "value"})

          {:ok, "result"}
        end)
  """
  @spec transaction((() -> result)) :: result when result: term()
  def transaction(fun) when is_function(fun, 0) do
    buffer = open_buffer()

    try do
      case fun.() do
        {:ok, _} = result ->
          commit_buffer()
          result

        other ->
          other
      end
    after
      if current() == buffer do
        close_buffer()
      end
    end
  end

  defp open_buffer() do
    {:ok, pid} = Agent.start_link(fn -> [] end)

    set_current({:buffer, pid})
  end

  defp commit_buffer() do
    case current() do
      {:buffer, _pid} ->
        buffer = get_buffer()
        close_buffer()

        for {event, opts} <- buffer do
          publish(event, opts)
        end

      __MODULE__ ->
        raise "No buffer to commit."
    end
  end

  defp close_buffer() do
    case current() do
      {:buffer, pid} ->
        pop_current()
        Agent.stop(pid)

      __MODULE__ ->
        raise "No buffer to close."
    end
  end

  defp current() do
    Process.get(@name_field, __MODULE__)
  end

  defp set_current(name) do
    stack = Process.get(@stack_field, [])
    current_name = Process.get(@name_field)

    if current_name do
      Process.put(@stack_field, [current_name | stack])
    end

    Process.put(@name_field, name)

    name
  end

  defp pop_current() do
    stack = Process.get(@stack_field, [])

    case stack do
      [] ->
        Process.delete(@name_field)

      [head | stack] ->
        Process.put(@stack_field, stack)
        Process.put(@name_field, head)
    end
  end

  defp publish_to_buffer(pid, event, opts) do
    Agent.update(pid, fn buffer -> [{event, opts} | buffer] end)
  end

  defp publish_directly(event, opts) do
    %event_type{} = event

    # Users can specify a mode via `opts`
    default_mode = Keyword.get(opts, :mode, :full_sync)

    # The mode can also be overridden in all circumstances using application
    # config.
    # This is useful, for example, to make sure that events are always published
    # synchronously in tests to avoid race conditions.
    mode = Application.get_env(:trolleybus, :mode_override, default_mode)

    sync_timeout = Keyword.get(opts, :sync_timeout, 5_000)

    handlers = event_type.__handlers__()

    case mode do
      :full_sync ->
        Enum.each(handlers, &run_handler(&1, event))

      :sync ->
        run_handlers_synchronously(handlers, event, sync_timeout)

      :async ->
        run_handlers_asynchronously(handlers, event)
    end
  end

  defp run_handlers_asynchronously(handlers, event) do
    Enum.each(handlers, fn handler ->
      Task.Supervisor.start_child(TaskSupervisor, fn ->
        run_handler(handler, event)
      end)
    end)
  end

  defp run_handlers_synchronously(handlers, event, sync_timeout) do
    results =
      Task.Supervisor.async_stream_nolink(
        TaskSupervisor,
        handlers,
        &run_handler(&1, event),
        ordered: true,
        on_timeout: :kill_task,
        timeout: sync_timeout
      )

    Enum.each(results, fn result ->
      case result do
        {:ok, _result} ->
          :ok

        error_or_exit ->
          Logger.error(
            "[#{inspect(__MODULE__)}] Dispatch failed while publishing #{inspect(event)}. " <>
              "Got: #{inspect(error_or_exit)}",
            grouping_title: "Dispatch failed while publishing #{inspect(event.__struct__)}",
            extra_info: %{event: inspect(event), error: inspect(error_or_exit)}
          )
      end
    end)
  end

  defp run_handler(handler, event) do
    handler.handle_event(event)
  catch
    kind, reason ->
      formatted = Exception.format(kind, reason, __STACKTRACE__)

      Logger.error(
        "[#{inspect(handler)}] Event handler failed with #{formatted} for event " <>
          "#{inspect(event)}",
        grouping_title:
          "Event handler #{inspect(handler)} failed for event #{inspect(event.__struct__)}",
        extra_info: %{
          handler: inspect(handler),
          event: inspect(event),
          exception: inspect(formatted)
        }
      )
  end
end