lib/gnat/jetstream/api/object.ex

defmodule Gnat.Jetstream.API.Object do
  @moduledoc """
  API for interacting with the JetStream Object Store

  Learn more about Object Store: https://docs.nats.io/nats-concepts/jetstream/obj_store
  """
  alias Gnat.Jetstream.API.{Consumer, Stream, Util}
  alias Gnat.Jetstream.API.Object.Meta

  @stream_prefix "OBJ_"
  @subject_prefix "$O."

  @type bucket_opt ::
          {:description, String.t()}
          | {:max_bucket_size, integer()}
          | {:max_chunk_size, integer()}
          | {:placement, Stream.placement()}
          | {:replicas, non_neg_integer()}
          | {:storage, :file | :memory}
          | {:ttl, non_neg_integer()}
  @spec create_bucket(Gnat.t(), String.t(), list(bucket_opt)) ::
          {:ok, Stream.info()} | {:error, any()}
  def create_bucket(conn, bucket_name, params \\ []) do
    with :ok <- validate_bucket_name(bucket_name) do
      stream = %Stream{
        name: stream_name(bucket_name),
        subjects: stream_subjects(bucket_name),
        description: Keyword.get(params, :description),
        discard: :new,
        allow_rollup_hdrs: true,
        max_age: Keyword.get(params, :ttl, 0),
        max_bytes: Keyword.get(params, :max_bucket_size, -1),
        max_msg_size: Keyword.get(params, :max_chunk_size, -1),
        num_replicas: Keyword.get(params, :replicas, 1),
        storage: Keyword.get(params, :storage, :file),
        placement: Keyword.get(params, :placement),
        duplicate_window: adjust_duplicate_window(Keyword.get(params, :ttl, 0))
      }

      Stream.create(conn, stream)
    end
  end

  @spec delete_bucket(Gnat.t(), String.t()) :: :ok | {:error, any}
  def delete_bucket(conn, bucket_name) do
    Stream.delete(conn, stream_name(bucket_name))
  end

  @spec delete(Gnat.t(), String.t(), String.t()) :: :ok | {:error, any}
  def delete(conn, bucket_name, object_name) do
    with {:ok, meta} <- info(conn, bucket_name, object_name),
         meta <- %Meta{meta | deleted: true},
         topic <- meta_stream_topic(bucket_name, object_name),
         {:ok, body} <- Jason.encode(meta),
         {:ok, _msg} <- Gnat.request(conn, topic, body, headers: [{"Nats-Rollup", "sub"}]) do
      filter = chunk_stream_topic(meta)
      Stream.purge(conn, stream_name(bucket_name), nil, %{filter: filter})
    end
  end

  @spec get(Gnat.t(), String.t(), String.t(), (binary -> any())) :: :ok | {:error, any}
  def get(conn, bucket_name, object_name, chunk_fun) do
    with {:ok, %{config: _stream}} <- Stream.info(conn, stream_name(bucket_name)),
         {:ok, meta} <- info(conn, bucket_name, object_name) do
      receive_chunks(conn, meta, chunk_fun)
    end
  end

  @spec info(Gnat.t(), String.t(), String.t()) :: {:ok, Meta.t()} | {:error, any}
  def info(conn, bucket_name, object_name) do
    with {:ok, _stream_info} <- Stream.info(conn, stream_name(bucket_name)) do
      Stream.get_message(conn, stream_name(bucket_name), %{
        last_by_subj: meta_stream_topic(bucket_name, object_name)
      })
      |> case do
        {:ok, message} ->
          meta = json_to_meta(message.data)
          {:ok, meta}

        error ->
          error
      end
    end
  end

  @type list_option :: {:show_deleted, boolean()}
  @spec list(Gnat.t(), String.t(), list(list_option())) :: {:error, any} | {:ok, list(Meta.t())}
  def list(conn, bucket_name, options \\ []) do
    with {:ok, %{config: stream}} <- Stream.info(conn, stream_name(bucket_name)),
         topic <- Util.reply_inbox(),
         {:ok, sub} <- Gnat.sub(conn, self(), topic),
         {:ok, consumer} <-
           Consumer.create(conn, %Consumer{
             stream_name: stream.name,
             deliver_subject: topic,
             deliver_policy: :last_per_subject,
             filter_subject: meta_stream_subject(bucket_name),
             ack_policy: :none,
             max_ack_pending: nil,
             replay_policy: :instant,
             max_deliver: 1
           }),
         {:ok, messages} <- receive_all_metas(sub, consumer.num_pending) do
      :ok = Gnat.unsub(conn, sub)
      :ok = Consumer.delete(conn, stream.name, consumer.name)

      show_deleted = Keyword.get(options, :show_deleted, false)

      if show_deleted do
        {:ok, messages}
      else
        {:ok, Enum.reject(messages, &(&1.deleted == true))}
      end
    end
  end

  @spec put(Gnat.t(), String.t(), String.t(), File.io_device()) ::
          {:ok, Meta.t()} | {:error, any()}
  def put(conn, bucket_name, object_name, io) do
    nuid = Util.nuid()
    chunk_topic = chunk_stream_topic(bucket_name, nuid)

    with {:ok, %{config: _}} <- Stream.info(conn, stream_name(bucket_name)),
         :ok <- purge_prior_chunks(conn, bucket_name, object_name),
         {:ok, chunks, size, digest} <- send_chunks(conn, io, chunk_topic) do
      object_meta = %Meta{
        name: object_name,
        bucket: bucket_name,
        nuid: nuid,
        size: size,
        chunks: chunks,
        digest: "SHA-256=#{Base.url_encode64(digest)}"
      }

      topic = meta_stream_topic(bucket_name, object_name)
      body = Jason.encode!(object_meta)

      case Gnat.request(conn, topic, body, headers: [{"Nats-Rollup", "sub"}]) do
        {:ok, _} ->
          {:ok, object_meta}

        error ->
          error
      end
    end
  end

  defp stream_name(bucket_name) do
    "#{@stream_prefix}#{bucket_name}"
  end

  defp stream_subjects(bucket_name) do
    [
      chunk_stream_subject(bucket_name),
      meta_stream_subject(bucket_name)
    ]
  end

  defp chunk_stream_subject(bucket_name) do
    "#{@subject_prefix}#{bucket_name}.C.>"
  end

  defp chunk_stream_topic(bucket_name, nuid) do
    "#{@subject_prefix}#{bucket_name}.C.#{nuid}"
  end

  defp chunk_stream_topic(%Meta{bucket: bucket, nuid: nuid}) do
    "#{@subject_prefix}#{bucket}.C.#{nuid}"
  end

  defp meta_stream_subject(bucket_name) do
    "#{@subject_prefix}#{bucket_name}.M.>"
  end

  defp meta_stream_topic(bucket_name, object_name) do
    key = Base.url_encode64(object_name)
    "#{@subject_prefix}#{bucket_name}.M.#{key}"
  end

  @two_minutes_in_nanoseconds 1_200_000_000
  # The `duplicate_window` can't be greater than the `max_age`. The default `duplicate_window`
  # is 2 minutes. We'll keep the 2 minute window UNLESS the ttl is less than 2 minutes
  defp adjust_duplicate_window(ttl) when ttl > 0 and ttl < @two_minutes_in_nanoseconds, do: ttl
  defp adjust_duplicate_window(_ttl), do: @two_minutes_in_nanoseconds

  defp json_to_meta(json) do
    raw = Jason.decode!(json)

    %{
      "bucket" => bucket,
      "chunks" => chunks,
      "digest" => digest,
      "name" => name,
      "nuid" => nuid,
      "size" => size
    } = raw

    %Meta{
      bucket: bucket,
      chunks: chunks,
      digest: digest,
      deleted: Map.get(raw, "deleted", false),
      name: name,
      nuid: nuid,
      size: size
    }
  end

  defp purge_prior_chunks(conn, bucket, name) do
    case info(conn, bucket, name) do
      {:ok, meta} ->
        Stream.purge(conn, stream_name(bucket), nil, %{filter: chunk_stream_topic(meta)})

      {:error, %{"code" => 404}} ->
        :ok

      {:error, other} ->
        {:error, other}
    end
  end

  defp receive_all_metas(sid, num_pending, messages \\ [])

  defp receive_all_metas(_sid, 0, messages) do
    {:ok, messages}
  end

  defp receive_all_metas(sid, remaining, messages) do
    receive do
      {:msg, %{sid: ^sid, body: body}} ->
        meta = json_to_meta(body)
        receive_all_metas(sid, remaining - 1, [meta | messages])
    after
      10_000 ->
        {:error, :timeout_waiting_for_messages}
    end
  end

  defp receive_chunks(conn, %Meta{} = meta, chunk_fun) do
    topic = chunk_stream_topic(meta)
    stream = stream_name(meta.bucket)
    inbox = Util.reply_inbox()
    {:ok, sub} = Gnat.sub(conn, self(), inbox)

    {:ok, consumer} =
      Consumer.create(conn, %Consumer{
        stream_name: stream,
        deliver_subject: inbox,
        deliver_policy: :all,
        filter_subject: topic,
        ack_policy: :none,
        max_ack_pending: nil,
        replay_policy: :instant,
        max_deliver: 1
      })

    :ok = receive_chunks(sub, meta.chunks, chunk_fun)

    :ok = Gnat.unsub(conn, sub)
    :ok = Consumer.delete(conn, stream, consumer.name)
  end

  defp receive_chunks(_sub, 0, _chunk_fun) do
    :ok
  end

  defp receive_chunks(sub, remaining, chunk_fun) do
    receive do
      {:msg, %{sid: ^sub, body: body}} ->
        chunk_fun.(body)
        receive_chunks(sub, remaining - 1, chunk_fun)
    after
      10_000 ->
        {:error, :timeout_waiting_for_messages}
    end
  end

  @chunk_size 128 * 1024
  defp send_chunks(conn, io, topic) do
    sha = :crypto.hash_init(:sha256)
    size = 0
    chunks = 0
    send_chunks(conn, io, topic, sha, size, chunks)
  end

  defp send_chunks(conn, io, topic, sha, size, chunks) do
    case IO.binread(io, @chunk_size) do
      :eof ->
        sha = :crypto.hash_final(sha)
        {:ok, chunks, size, sha}

      {:error, err} ->
        {:error, err}

      bytes ->
        sha = :crypto.hash_update(sha, bytes)
        size = size + byte_size(bytes)
        chunks = chunks + 1

        case Gnat.request(conn, topic, bytes) do
          {:ok, _} ->
            send_chunks(conn, io, topic, sha, size, chunks)

          error ->
            error
        end
    end
  end

  defp validate_bucket_name(name) do
    case Regex.match?(~r/^[a-zA-Z0-9_-]+$/, name) do
      true -> :ok
      false -> {:error, "invalid bucket name"}
    end
  end
end