lib/gnat/jetstream/api/consumer.ex

defmodule Gnat.Jetstream.API.Consumer do
  @moduledoc """
  A module representing a NATS JetStream Consumer.

  Learn more about consumers: https://docs.nats.io/nats-concepts/jetstream/consumers

  ## The Jetstream.API.Consumer struct

  The struct's only mandatory field to set is the `:stream_name`. The rest will have
  the NATS default values set.

  Note that consumers are ephemeral by default. Set the `:durable_name` to make it durable.

  Consumer struct fields explanation:

  * `:stream_name` - name of a stream the consumer is pointing at.
  * `:domain` - JetStream domain the stream is on.
  * `:ack_policy` - how the messages should be acknowledged. It has the following options:
      - `:explicit` - the default policy. It means that each individual message must be acknowledged.
        It is the only allowed option for pull consumers.
      - `:none` - no need to ack messages, the server will assume ack on delivery.
      - `:all` - only the last received message needs to be acked, all the previous messages received
        are automatically acknowledged.
  * `:ack_wait` - time in nanoseconds that server will wait for an ack for any individual. If an ack
     is not received in time, the message will be redelivered.
  * `:backoff` - list of durations that represents a retry timescale for NAK'd messages or those being
     normally retried.
  * `:deliver_group` - when set, will only deliver messages to subscriptions matching that group.
  * `:deliver_policy` - specifies where in the stream it wants to start receiving messages. It has the
     following options:
       - `:all` - the default policy. The consumer will start receiving from the earliest available
         message.
       - `:last` - the consumer will start receiving messages with the last message added to the stream.
       - `:new` - the consumer will only start receiving messages that were created after the customer
         was created.
       - `:by_start_sequence` - the consumer is required to specify `:opt_start_seq`, the sequence number
         to start on. It will receive the closest available message moving forward in the sequence
         should the message specified have been removed based on the stream limit policy.
       - `:by_start_time` - the consumer will start with messages on or after this time. The consumer is
         required to specify `:opt_start_time`, the time in the stream to start at.
       - `:last_per_subject` - the consumer will start with the latest one for each filtered subject
         currently  in the stream.
  * `:deliver_subject` - the subject to deliver observed messages. Not allowed for pull subscriptions.
    A delivery subject is required for queue subscribing as it configures a subject that all the queue
    consumers should listen on.
  * `:description` - a short description of the purpose of this customer.
  * `:durable_name` - the name of the consumer, which the server will track, allowing resuming consumption
    where left off. By default, a consumer is ephemeral. To make the consumer durable, set the name.
    See [naming](https://docs.nats.io/running-a-nats-service/nats_admin/jetstream_admin/naming).
  * `:filter_subject` - when consuming from a stream with a wildcard subject, this allows you to select
    a subset of the full wildcard subject to receive messages from.
  * `:flow_control` - when set to true, an empty message with Status header 100 and a reply subject will
    be sent. Consumers must reply to these messages to control the rate of message delivery.
  * `:headers_only` - delivers only the headers of messages in the stream and not the bodies. Additionally
    adds the Nats-Msg-Size header to indicate the size of the removed payload.
  * `:idle_heartbeat` - if set, the server will regularly send a status message to the client while there
    are  no new messages to send. This lets the client know that the JetStream service is still up and
    running, even when there is no activity on the stream. The message status header will have a code of 100.
    Unlike `:flow_control`, it will have no reply to address. It may have a description like
    "Idle Heartbeat".
  * `:inactive_threshold` - duration that instructs the server to clean up ephemeral consumers that are
    inactive for that long.
  * `:max_ack_pending` - it sets the maximum number of messages without an acknowledgement that can be
    outstanding, once this limit is reached, message delivery will be suspended. It cannot be used with
    `:ack_none` ack policy. This maximum number of pending acks applies for all the consumer's
    subscriber processes. A value of -1 means there can be any number of pending acks (i.e. no flow
    control).
  * `:max_batch` - the largest batch property that may be specified when doing a pull on a Pull consumer.
  * `:max_deliver` - the maximum number of times a specific message will be delivered. Applies to any
    message that is re-sent due to ack policy.
  * `:max_expires` - the maximum expires value that may be set when doing a pull on a Pull consumer.
  * `:max_waiting` - the number of pulls that can be outstanding on a pull consumer, pulls received after
    this is reached are ignored.
  * `:opt_start_seq` - use with `:deliver_policy` set to `:by_start_sequence`. It represents the sequence
    number to start consuming on.
  * `:opt_start_time` - use with `:deliver_policy` set to `:by_start_time`. It represents the time to start
    consuming at.
  * `:rate_limit_bps` - used to throttle the delivery of messages to the consumer, in bits per second.
  * `:replay_policy` - it applies when the `:deliver_policy` is set to `:all`, `:by_start_sequence` or
    `:by_start_time`. It has the following options:
       - `:instant` - the default policy. The messages will be pushed to the client as fast as possible.
       - `:original` - the messages in the stream will be pushed to the client at the same rate that they
         were originally received.
  * `:sample_freq` - Sets the percentage of acknowledgements that should be sampled for observability, 0-100.
    This value is a binary and for example allows both `30` and `30%` as valid values.
  """

  import Gnat.Jetstream.API.Util

  @enforce_keys [:stream_name]
  defstruct [
    :backoff,
    :deliver_group,
    :deliver_subject,
    :description,
    :domain,
    :durable_name,
    :filter_subject,
    :flow_control,
    :headers_only,
    :idle_heartbeat,
    :inactive_threshold,
    :max_batch,
    :max_expires,
    :max_waiting,
    :opt_start_seq,
    :opt_start_time,
    :rate_limit_bps,
    :sample_freq,
    :stream_name,
    ack_policy: :explicit,
    ack_wait: 30_000_000_000,
    deliver_policy: :all,
    max_ack_pending: 20_000,
    max_deliver: -1,
    replay_policy: :instant
  ]

  @type t :: %__MODULE__{
          stream_name: binary(),
          domain: nil | binary(),
          ack_policy: :none | :all | :explicit,
          ack_wait: nil | non_neg_integer(),
          backoff: nil | [non_neg_integer()],
          deliver_group: nil | binary(),
          deliver_policy:
            :all | :last | :new | :by_start_sequence | :by_start_time | :last_per_subject,
          deliver_subject: nil | binary(),
          description: nil | binary(),
          durable_name: nil | binary(),
          filter_subject: nil | binary(),
          flow_control: nil | boolean(),
          headers_only: nil | boolean(),
          idle_heartbeat: nil | non_neg_integer(),
          inactive_threshold: nil | non_neg_integer(),
          max_ack_pending: nil | integer(),
          max_batch: nil | integer(),
          max_deliver: nil | integer(),
          max_expires: nil | non_neg_integer(),
          max_waiting: nil | integer(),
          opt_start_seq: nil | non_neg_integer(),
          opt_start_time: nil | DateTime.t(),
          rate_limit_bps: nil | non_neg_integer(),
          replay_policy: :instant | :original,
          sample_freq: nil | binary()
        }

  @type info :: %{
          ack_floor: %{
            consumer_seq: non_neg_integer(),
            stream_seq: non_neg_integer()
          },
          cluster:
            nil
            | %{
                optional(:name) => binary(),
                optional(:leader) => binary(),
                optional(:replicas) => [
                  %{
                    :active => non_neg_integer(),
                    :current => boolean(),
                    :name => binary(),
                    optional(:lag) => non_neg_integer(),
                    optional(:offline) => boolean()
                  }
                ]
              },
          config: config(),
          created: DateTime.t(),
          delivered: %{
            consumer_seq: non_neg_integer(),
            stream_seq: non_neg_integer()
          },
          name: binary(),
          num_ack_pending: non_neg_integer(),
          num_pending: non_neg_integer(),
          num_redelivered: non_neg_integer(),
          num_waiting: non_neg_integer(),
          push_bound: nil | boolean(),
          stream_name: binary()
        }

  @type config :: %{
          ack_policy: :none | :all | :explicit,
          ack_wait: nil | non_neg_integer(),
          backoff: nil | [non_neg_integer()],
          deliver_group: nil | binary(),
          deliver_policy:
            :all | :last | :new | :by_start_sequence | :by_start_time | :last_per_subject,
          deliver_subject: nil | binary(),
          description: nil | binary(),
          durable_name: nil | binary(),
          filter_subject: nil | binary(),
          flow_control: nil | boolean(),
          headers_only: nil | boolean(),
          idle_heartbeat: nil | non_neg_integer(),
          inactive_threshold: nil | non_neg_integer(),
          max_ack_pending: nil | integer(),
          max_batch: nil | integer(),
          max_deliver: nil | integer(),
          max_expires: nil | non_neg_integer(),
          max_waiting: nil | integer(),
          opt_start_seq: nil | non_neg_integer(),
          opt_start_time: nil | DateTime.t(),
          rate_limit_bps: nil | non_neg_integer(),
          replay_policy: :instant | :original,
          sample_freq: nil | binary()
        }

  @type consumers :: %{
          consumers: list(binary()),
          limit: non_neg_integer(),
          offset: non_neg_integer(),
          total: non_neg_integer()
        }

  @doc """
  Creates a consumer. When consumer's `:durable_name` field is not set, the function
  creates an ephemeral consumer. Otherwise, it creates a durable consumer.

  ## Examples

      iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
      iex> {:ok, %{name: "consumer", stream_name: "astream"}} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"})

      iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
      iex> {:error, %{"description" => "consumer delivery policy is deliver by start sequence, but optional start sequence is not set"}} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream", deliver_policy: :by_start_sequence})

  """
  @spec create(conn :: Gnat.t(), consumer :: t()) :: {:ok, info()} | {:error, term()}
  def create(conn, %__MODULE__{durable_name: name} = consumer) when not is_nil(name) do
    create_topic =
      "#{js_api(consumer.domain)}.CONSUMER.DURABLE.CREATE.#{consumer.stream_name}.#{name}"

    with :ok <- validate_durable(consumer),
         {:ok, raw_response} <- request(conn, create_topic, create_payload(consumer)) do
      {:ok, to_info(raw_response)}
    end
  end

  def create(conn, %__MODULE__{} = consumer) do
    create_topic = "#{js_api(consumer.domain)}.CONSUMER.CREATE.#{consumer.stream_name}"

    with :ok <- validate(consumer),
         {:ok, raw_response} <- request(conn, create_topic, create_payload(consumer)) do
      {:ok, to_info(raw_response)}
    end
  end

  @doc """
  Deletes a consumer.

  ## Examples

      iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
      iex> {:ok, _response} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"})
      iex> Gnat.Jetstream.API.Consumer.delete(:gnat, "astream", "consumer")
      :ok

      iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Consumer.delete(:gnat, "wrong_stream", "consumer")

  """
  @spec delete(
          conn :: Gnat.t(),
          stream_name :: binary(),
          consumer_name :: binary(),
          domain :: nil | binary()
        ) ::
          :ok | {:error, any()}
  def delete(conn, stream_name, consumer_name, domain \\ nil) do
    topic = "#{js_api(domain)}.CONSUMER.DELETE.#{stream_name}.#{consumer_name}"

    with {:ok, _response} <- request(conn, topic, "") do
      :ok
    end
  end

  @doc """
  Information about the consumer.

  ## Examples

      iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
      iex> {:ok, _response} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"})
      iex> {:ok, %{created: _}} = Gnat.Jetstream.API.Consumer.info(:gnat, "astream", "consumer")

      iex>  {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Consumer.info(:gnat, "wrong_stream", "consumer")

  """
  @spec info(
          conn :: Gnat.t(),
          stream_name :: binary(),
          consumer_name :: binary(),
          domain :: nil | binary()
        ) ::
          {:ok, info()} | {:error, any()}
  def info(conn, stream_name, consumer_name, domain \\ nil) do
    topic = "#{js_api(domain)}.CONSUMER.INFO.#{stream_name}.#{consumer_name}"

    with {:ok, raw} <- request(conn, topic, "") do
      {:ok, to_info(raw)}
    end
  end

  @doc """
  Paged list of known consumers, including their current info.

  ## Examples

      iex> {:ok, _response} =  Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
      iex> {:ok, %{consumers: _, limit: 1024, offset: 0, total: _}} = Gnat.Jetstream.API.Consumer.list(:gnat, "astream")

      iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Gnat.Jetstream.API.Consumer.list(:gnat, "wrong_stream")

  """
  @spec list(
          conn :: Gnat.t(),
          stream_name :: binary(),
          params :: [offset: non_neg_integer(), domain: nil | binary()]
        ) ::
          {:ok, consumers()} | {:error, term()}
  def list(conn, stream_name, params \\ []) do
    domain = Keyword.get(params, :domain)

    payload =
      Jason.encode!(%{
        offset: Keyword.get(params, :offset, 0)
      })

    with {:ok, raw} <- request(conn, "#{js_api(domain)}.CONSUMER.NAMES.#{stream_name}", payload) do
      response = %{
        consumers: Map.get(raw, "consumers"),
        limit: Map.get(raw, "limit"),
        offset: Map.get(raw, "offset"),
        total: Map.get(raw, "total")
      }

      {:ok, response}
    end
  end

  @doc """
  Requests a next message from a stream to be consumed. The response (consumed message)will be sent
  on the subject given as the `reply_to` parameter.

  ## Options

  * `batch` - How many messages to receive. Messages will be sent to the `reply_to` subject
    separately. Defaults to 1.

  * `expires` - Time in nanoseconds the request will be kept in the server. Once this time passes
    a message with empty body and topic set to `reply_to` subject is sent. Useful when polling
    the server frequently and not wanting the pull requests to accumulate. By default, the pull
    request stays in the server until a message comes.

  * `no_wait` - Boolean value which indicates whether the pull request should be accumulated on
    the server. When set to true and no message is present to be consumed, a message with empty
    body and topic value set to `reply_to` is sent. Defaults to false.

  ## Example

      iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
      iex> {:ok, _response} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"})
      iex> {:ok, _sid} = Gnat.sub(:gnat, self(), "reply_subject")
      iex> :ok = Gnat.Jetstream.API.Consumer.request_next_message(:gnat, "astream", "consumer", "reply_subject")
      iex> :ok = Gnat.pub(:gnat, "subject", "message1")
      iex> assert_receive {:msg, %{body: "message1", topic: "subject"}}
  """
  @spec request_next_message(
          conn :: Gnat.t(),
          stream_name :: binary(),
          consumer_name :: binary(),
          reply_to :: String.t(),
          domain :: nil | binary(),
          opts :: keyword()
        ) :: :ok
  def request_next_message(
        conn,
        stream_name,
        consumer_name,
        reply_to,
        domain \\ nil,
        opts \\ []
      ) do
    default_payload = %{batch: 1}

    put_option_if_not_nil = fn payload, option_key ->
      if option_value = opts[option_key] do
        Map.put(payload, option_key, option_value)
      else
        payload
      end
    end

    payload =
      default_payload
      |> put_option_if_not_nil.(:batch)
      |> put_option_if_not_nil.(:no_wait)
      |> put_option_if_not_nil.(:expires)
      |> Jason.encode!()

    Gnat.pub(
      conn,
      "#{js_api(domain)}.CONSUMER.MSG.NEXT.#{stream_name}.#{consumer_name}",
      payload,
      reply_to: reply_to
    )
  end

  # https://docs.nats.io/running-a-nats-service/configuration/leafnodes/jetstream_leafnodes
  defp js_api(nil), do: "$JS.API"
  defp js_api(""), do: "$JS.API"
  defp js_api(domain), do: "$JS.#{domain}.API"

  defp create_payload(%__MODULE__{} = cons) do
    %{
      config: %{
        ack_policy: cons.ack_policy,
        ack_wait: cons.ack_wait,
        backoff: cons.backoff,
        deliver_group: cons.deliver_group,
        deliver_policy: cons.deliver_policy,
        deliver_subject: cons.deliver_subject,
        description: cons.description,
        durable_name: cons.durable_name,
        filter_subject: cons.filter_subject,
        flow_control: cons.flow_control,
        headers_only: cons.headers_only,
        idle_heartbeat: cons.idle_heartbeat,
        inactive_threshold: cons.inactive_threshold,
        max_ack_pending: cons.max_ack_pending,
        max_batch: cons.max_batch,
        max_deliver: cons.max_deliver,
        max_expires: cons.max_expires,
        max_waiting: cons.max_waiting,
        opt_start_seq: cons.opt_start_seq,
        opt_start_time: cons.opt_start_time,
        rate_limit_bps: cons.rate_limit_bps,
        replay_policy: cons.replay_policy,
        sample_freq: cons.sample_freq
      },
      stream_name: cons.stream_name
    }
    |> Jason.encode!()
  end

  defp to_config(raw) do
    %{
      ack_policy: raw |> Map.get("ack_policy") |> to_sym(),
      ack_wait: raw |> Map.get("ack_wait"),
      backoff: Map.get(raw, "backoff"),
      deliver_group: Map.get(raw, "deliver_group"),
      deliver_policy: raw |> Map.get("deliver_policy") |> to_sym(),
      deliver_subject: raw |> Map.get("deliver_subject"),
      description: Map.get(raw, "description"),
      durable_name: Map.get(raw, "durable_name"),
      filter_subject: raw |> Map.get("filter_subject"),
      flow_control: Map.get(raw, "flow_control"),
      headers_only: Map.get(raw, "headers_only"),
      idle_heartbeat: Map.get(raw, "idle_heartbeat"),
      inactive_threshold: Map.get(raw, "inactive_threshold"),
      max_ack_pending: Map.get(raw, "max_ack_pending"),
      max_batch: Map.get(raw, "max_batch"),
      max_deliver: Map.get(raw, "max_deliver"),
      max_expires: Map.get(raw, "max_expires"),
      max_waiting: Map.get(raw, "max_waiting"),
      opt_start_seq: raw |> Map.get("opt_start_seq"),
      opt_start_time: raw |> Map.get("opt_start_time") |> to_datetime(),
      rate_limit_bps: Map.get(raw, "rate_limit_bps"),
      replay_policy: raw |> Map.get("replay_policy") |> to_sym(),
      sample_freq: Map.get(raw, "sample_freq")
    }
  end

  defp to_info(raw) do
    %{
      ack_floor: %{
        consumer_seq: get_in(raw, ["ack_floor", "consumer_seq"]),
        stream_seq: get_in(raw, ["ack_floor", "stream_seq"])
      },
      cluster: Map.get(raw, "cluster"),
      config: to_config(Map.get(raw, "config")),
      created: raw |> Map.get("created") |> to_datetime(),
      delivered: %{
        consumer_seq: get_in(raw, ["delivered", "consumer_seq"]),
        stream_seq: get_in(raw, ["delivered", "stream_seq"])
      },
      name: Map.get(raw, "name"),
      num_ack_pending: Map.get(raw, "num_ack_pending"),
      num_pending: Map.get(raw, "num_pending"),
      num_redelivered: Map.get(raw, "num_redelivered"),
      num_waiting: Map.get(raw, "num_waiting"),
      push_bound: Map.get(raw, "push_bound"),
      stream_name: Map.get(raw, "stream_name")
    }
  end

  defp validate(consumer) do
    cond do
      consumer.stream_name == nil ->
        {:error, "must have a :stream_name set"}

      is_binary(consumer.stream_name) == false ->
        {:error, "stream_name must be a string"}

      valid_name?(consumer.stream_name) == false ->
        {:error, "invalid stream_name: " <> invalid_name_message()}

      true ->
        :ok
    end
  end

  defp validate_durable(consumer) do
    with :ok <- validate(consumer) do
      cond do
        is_binary(consumer.durable_name) == false ->
          {:error, "durable_name must be a string"}

        valid_name?(consumer.durable_name) == false ->
          {:error, "invalid durable_name: " <> invalid_name_message()}

        true ->
          :ok
      end
    end
  end
end