lib/walex/event.ex

defmodule WalEx.Event do
  require Logger
  import WalEx.TransactionFilter

  alias WalEx.Changes
  alias WalEx.Event

  defstruct(
    table: nil,
    type: nil,
    new_record: nil,
    old_record: nil,
    changes: nil,
    commit_timestamp: nil
  )

  @doc """
  Macros for processing events
  """
  defmacro __using__(opts) do
    app_name = Keyword.get(opts, :name)

    quote do
      def filter_events(txn) do
        Event.filter_and_cast(unquote(app_name), txn)
      end

      def filter_events(txn, table, type, filters) do
        Event.filter_and_cast(unquote(app_name), txn, table, type, filters)
      end

      defmacro process_events_async(events, functions) do
        module = hd(__CALLER__.context_modules)

        quote do
          Enum.each(unquote(events), fn event ->
            unquote(functions)
            |> Enum.each(fn function ->
              task_fn =
                case function do
                  # If function is a tuple, treat it as {Module, function}
                  {mod, func} when is_atom(mod) and is_atom(func) ->
                    fn -> apply(mod, func, [event]) end

                  # If function is an atom, treat it as a local function in the current module
                  # maybe don't allow atoms
                  func when is_atom(func) ->
                    fn -> apply(unquote(module), func, [event]) end

                  _ ->
                    raise ArgumentError, "Invalid function: #{inspect(function)}"
                end

              Task.start(task_fn)
            end)
          end)
        end
      end

      defmacro on_event(:all, do_block) do
        quote do
          def process_all(txn) do
            case filter_events(txn) do
              filtered_events when is_list(filtered_events) and filtered_events != [] ->
                unquote(do_block).(filtered_events)

              _ ->
                {:error, :no_events}
            end
          end
        end
      end

      defmacro on_event(table, filters \\ %{}, functions \\ [], do_block) do
        quote do
          def process_all(txn) do
            case filter_events(txn, unquote(table), unquote(nil), unquote(filters)) do
              filtered_events when is_list(filtered_events) and filtered_events != [] ->
                process_events_async(filtered_events, unquote(functions))
                unquote(do_block).(filtered_events)

              _ ->
                {:error, :no_events}
            end
          end
        end
      end

      defp process_event(table, type, filters, functions, do_block) do
        quote do
          def unquote(:"process_#{type}")(txn) do
            case filter_events(txn, unquote(table), unquote(type), unquote(filters)) do
              filtered_events when is_list(filtered_events) and filtered_events != [] ->
                process_events_async(filtered_events, unquote(functions))
                unquote(do_block).(filtered_events)

              _ ->
                {:error, :no_events}
            end
          end
        end
      end

      defmacro on_insert(table, filters \\ %{}, functions \\ [], do_block) do
        process_event(table, :insert, filters, functions, do_block)
      end

      defmacro on_update(table, filters \\ %{}, functions \\ [], do_block) do
        process_event(table, :update, filters, functions, do_block)
      end

      defmacro on_delete(table, filters \\ %{}, functions \\ [], do_block) do
        process_event(table, :delete, filters, functions, do_block)
      end
    end
  end

  def cast(%Changes.NewRecord{
        table: table,
        type: "INSERT",
        record: record,
        commit_timestamp: commit_timestamp
      }) do
    %Event{
      table: String.to_atom(table),
      type: :insert,
      new_record: record,
      old_record: nil,
      changes: nil,
      commit_timestamp: commit_timestamp
    }
  end

  def cast(%Changes.UpdatedRecord{
        table: table,
        type: "UPDATE",
        record: record,
        old_record: old_record,
        commit_timestamp: commit_timestamp
      }) do
    %Event{
      table: String.to_atom(table),
      type: :update,
      new_record: record,
      old_record: old_record,
      changes: changes(old_record, record),
      commit_timestamp: commit_timestamp
    }
  end

  def cast(%Changes.DeletedRecord{
        table: table,
        type: "DELETE",
        old_record: old_record,
        commit_timestamp: commit_timestamp
      }) do
    %Event{
      table: String.to_atom(table),
      type: :delete,
      new_record: nil,
      old_record: old_record,
      changes: nil,
      commit_timestamp: commit_timestamp
    }
  end

  def cast(_event), do: nil

  @doc """
  Filter out events by table and type (optional) from transaction and cast to Event struct
  """
  def filter_and_cast(app_name, txn) do
    txn
    |> filter_subscribed(app_name)
    |> Enum.map(&cast(&1))
  end

  def filter_and_cast(app_name, txn, table, type, %{
        unwatched_records: unwatched_records,
        unwatched_fields: unwatched_fields
      }) do
    txn
    |> filter_changes(table, type, app_name)
    |> Enum.map(&cast(&1))
    |> filter_unwatched_records(unwatched_records)
    |> filter_unwatched_fields(unwatched_fields)
  end

  def filter_and_cast(app_name, txn, table, type, %{unwatched_records: unwatched_records}) do
    txn
    |> filter_changes(table, type, app_name)
    |> Enum.map(&cast(&1))
    |> filter_unwatched_records(unwatched_records)
  end

  def filter_and_cast(app_name, txn, table, type, %{unwatched_fields: unwatched_fields}) do
    txn
    |> filter_changes(table, type, app_name)
    |> Enum.map(&cast(&1))
    |> filter_unwatched_fields(unwatched_fields)
  end

  def filter_and_cast(app_name, txn, table, type, _filters) do
    txn
    |> filter_changes(table, type, app_name)
    |> Enum.map(&cast(&1))
  end
end