lib/off_broadway/producer.ex

with {:module, _} <- Code.ensure_compiled(Broadway) do
  defmodule OffBroadway.Polyn.Producer do
    @moduledoc """
    A [Broadway](https://hexdocs.pm/broadway/Broadway.html) Producer for Polyn.

    The word `Producer` here is confusing because the word is overloaded.
    In this module `Producer` refers to [GenStage](https://hexdocs.pm/gen_stage/GenStage.html) data
    pipelines where a `:producer` is the stage that receives demand for data and sends it to a `:consumer`.
    This module doesn't "produce" new events that get added to the NATS server for other services to consume.
    Rather it consumes existing events from a NATS Stream and passes them to GenStage `:consumer` modules
    in one application.

    ## Usage

    This module wraps `OffBroadway.Jetstream.Producer` and will validate that any messages coming through
    are valid events and conform to the schema for the event. Use the `OffBroadway.Jetstream.Producer` documentation
    to learn how to use it. One difference is that you will use `OffBroadway.Polyn.Producer`
    in your `:module` configuration instead of the Jetstream one. Invalid messages will send an ACKTERM
    to the NATS server so that they aren't sent again. They will be marked as `failed` and removed from the pipeline.
    Valid messages that come in a batch with an invalid message will send a NACK response before an error
    is raised so that the NATS server will know they were received but need to be sent again

    Another key difference that Polyn adds is that the `:consumer_name` will be taken care of for you
    by using the passed `type` and configured `:source_root`. You can pass in a `:source` to `:module`
    to get a more specific `:consumer_name`.

    You can pass in `:stream_name` to `:module` to use a `stream_name` not derived from the `type`

    ## Example

    ```elixir
    defmodule MyBroadway do
      use Broadway

      def start_link(_opts) do
        Broadway.start_link(
          __MODULE__,
          name: MyBroadway,
          producer: [
            module: {
              OffBroadway.Polyn.Producer,
              connection_name: :gnat,
              type: "user.created.v1"
            },
            concurrency: 10
          ],
          processors: [
            default: [concurrency: 10]
          ],
          batchers: [
            example: [
              concurrency: 5,
              batch_size: 10,
              batch_timeout: 2_000
            ]
          ]
        )
      end

      def handle_message(_processor_name, message, _context) do
        message
        |> Message.update_data(&process_data/1)
        |> Message.put_batcher(:example)
      end
    end
    ```
    """
    use GenStage
    use Polyn.Tracing

    alias Broadway.{Message, Producer}
    alias Polyn.SchemaStore
    alias Polyn.Serializers.JSON

    @behaviour Producer

    @impl Producer
    def prepare_for_start(module, broadway_opts) do
      opts = broadway_opts[:producer][:module] |> elem(1)
      producer(opts).prepare_for_start(module, broadway_opts)
    end

    @impl Producer
    def prepare_for_draining(state) do
      state.producer.prepare_for_draining(state)
    end

    @impl GenStage
    def handle_info(any, state) do
      response = state.producer.handle_info(any, state)

      case Tuple.to_list(response) do
        [:noreply, messages, state | rest] -> process_messages(messages, state, rest)
        _other -> response
      end
    end

    @impl GenStage
    def init(opts) do
      opts = add_consumer_and_stream_name(opts)
      producer = producer(opts)

      [:producer, state | rest] = producer.init(opts) |> Tuple.to_list()

      state =
        Map.put(state, :store_name, store_name(opts))
        |> Map.put(:producer, producer)
        |> Map.put(:acknowledger, acknowledger(opts))
        |> Map.put(:type, opts[:type])

      [:producer, state | rest] |> List.to_tuple()
    end

    @impl GenStage
    def handle_demand(incoming_demand, state) do
      response = state.producer.handle_demand(incoming_demand, state)

      case Tuple.to_list(response) do
        [:noreply, messages, state | rest] ->
          process_messages(messages, state, rest)

        _other ->
          response
      end
    end

    defp process_messages(messages, state, rest) do
      Polyn.Tracing.processing_span state.type do
        store_name = state.store_name
        messages = Enum.map(messages, &message_to_event(store_name, state, &1))

        handle_invalid_messages!(messages, state)

        [:noreply, messages, state | rest] |> List.to_tuple()
      end
    end

    defp message_to_event(store_name, %{type: type} = state, %Message{data: data} = message) do
      process_span_context = Polyn.Tracing.current_context()

      Polyn.Tracing.subscribe_span type, message.metadata[:headers] do
        Polyn.Tracing.link_to_context(process_span_context)

        case JSON.deserialize(data, store_name: store_name) do
          {:ok, event} ->
            Polyn.Tracing.span_attributes(
              conn: state.connection_options.connection_name,
              type: type,
              event: event,
              payload: data
            )

            Message.update_data(message, fn _data -> event end)

          {:error, error} ->
            Message.configure_ack(message, on_failure: :term)
            |> Message.failed(error)
        end
      end
    end

    defp handle_invalid_messages!(messages, %{ack_ref: ack_ref, acknowledger: acknowledger}) do
      if any_invalid?(messages) do
        # Treat all messages as failed since some are invalid. The ones that are valid
        # will send a NACK to indicate they weren't processed and should be sent again
        # the invalid ones will be given TERM so they aren't sent again
        acknowledger.ack(ack_ref, [], messages)

        raise Polyn.ValidationException, combine_invalid_message_errors(messages)
      end
    end

    defp any_invalid?(messages) do
      Enum.any?(messages, &message_invalid?/1)
    end

    defp message_invalid?(message), do: message.status != :ok

    defp combine_invalid_message_errors(messages) do
      Enum.filter(messages, &message_invalid?/1)
      |> Enum.map_join("\n", fn
        %{status: {:failed, reason}} -> reason
        _ -> ""
      end)
    end

    defp store_name(opts) do
      Keyword.get(opts, :store_name, SchemaStore.store_name())
    end

    defp add_consumer_and_stream_name(opts) do
      type = Keyword.fetch!(opts, :type)
      source = Keyword.get(opts, :source)
      consumer_name = Polyn.Naming.consumer_name(type, source)

      Keyword.put(opts, :stream_name, stream_name(opts))
      |> Keyword.put(:consumer_name, consumer_name)
    end

    defp stream_name(opts) do
      opts[:stream_name] ||
        Polyn.Jetstream.lookup_stream_name!(opts[:connection_name], opts[:type])
    end

    defp producer(opts) do
      if sandbox(opts), do: Polyn.Jetstream.MockProducer, else: OffBroadway.Jetstream.Producer
    end

    defp acknowledger(opts) do
      if sandbox(opts),
        do: Polyn.Jetstream.MockAcknowledger,
        else: OffBroadway.Jetstream.Acknowledger
    end

    # If Application config dictates "sandbox mode" we prioritize that
    # otherwise we defer to a passed in option
    defp sandbox(opts) do
      case Application.get_env(:polyn, :sandbox) do
        nil -> Keyword.get(opts, :sandbox, false)
        other -> other
      end
    end
  end
end