lib/momento/internal/scs_data_client.ex

defmodule Momento.Internal.ScsDataClient do
  alias Momento.Auth.CredentialProvider
  alias Momento.Responses.{Set, Get, Delete}
  alias Momento.Requests.CollectionTtl
  import Momento.Validation

  @enforce_keys [:auth_token, :channel]
  defstruct [:auth_token, :channel]

  @moduledoc false

  @opaque t() :: %__MODULE__{
            auth_token: String.t(),
            channel: GRPC.Channel.t()
          }

  defimpl Inspect, for: Momento.Internal.ScsDataClient do
    def inspect(%Momento.Internal.ScsDataClient{} = data_client, _opts) do
      "#Momento.Internal.ScsDataClient<auth_token: [hidden], channel: #{inspect(data_client.channel)}>"
    end
  end

  @spec create(CredentialProvider.t()) :: {:ok, t()} | {:error, any()}
  def create(credential_provider) do
    cache_endpoint = CredentialProvider.cache_endpoint(credential_provider)
    tls_options = :tls_certificate_check.options(cache_endpoint)

    with {:ok, channel} <-
           GRPC.Stub.connect(cache_endpoint <> ":443",
             cred: GRPC.Credential.new(ssl: tls_options)
           ) do
      {:ok,
       %__MODULE__{
         auth_token: CredentialProvider.auth_token(credential_provider),
         channel: channel
       }}
    end
  end

  @spec set(
          data_client :: t(),
          cache_name :: String.t(),
          key :: binary(),
          value :: binary(),
          ttl_seconds :: number()
        ) :: Momento.Responses.Set.t()
  def set(data_client, cache_name, key, value, ttl_seconds) do
    with :ok <- validate_cache_name(cache_name),
         {:ok, ttl_milliseconds} <- get_ttl_milliseconds(ttl_seconds) do
      try do
        metadata = create_metadata(cache_name, data_client)

        set_request = %Momento.Protos.CacheClient.SetRequest{
          cache_key: key,
          cache_body: value,
          ttl_milliseconds: ttl_milliseconds
        }

        case Momento.Protos.CacheClient.Scs.Stub.set(data_client.channel, set_request,
               metadata: metadata
             ) do
          {:ok, _} -> {:ok, %Set.Ok{}}
          {:error, error_response} -> {:error, Momento.Error.convert(error_response)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec get(data_client :: t(), cache_name :: String.t(), key :: binary()) ::
          Momento.Responses.Get.t()
  def get(data_client, cache_name, key) do
    with :ok <- validate_cache_name(cache_name) do
      try do
        metadata = create_metadata(cache_name, data_client)

        get_request = %Momento.Protos.CacheClient.GetRequest{cache_key: key}

        case Momento.Protos.CacheClient.Scs.Stub.get(data_client.channel, get_request,
               metadata: metadata
             ) do
          {:ok, %Momento.Protos.CacheClient.GetResponse{result: :Hit, cache_body: cache_body}} ->
            {:ok, %Get.Hit{value: cache_body}}

          {:ok, %Momento.Protos.CacheClient.GetResponse{result: :Miss}} ->
            :miss

          {:error, error_response} ->
            {:error, Momento.Error.convert(error_response)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec delete(data_client :: t(), cache_name :: String.t(), key :: binary()) ::
          Momento.Responses.Delete.t()
  def delete(data_client, cache_name, key) do
    with :ok <- validate_cache_name(cache_name) do
      try do
        metadata = create_metadata(cache_name, data_client)

        delete_request = %Momento.Protos.CacheClient.DeleteRequest{cache_key: key}

        case Momento.Protos.CacheClient.Scs.Stub.delete(data_client.channel, delete_request,
               metadata: metadata
             ) do
          {:ok, _} -> {:ok, %Delete.Ok{}}
          {:error, error_response} -> {:error, Momento.Error.convert(error_response)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec sorted_set_put_elements(
          data_client :: t(),
          cache_name :: String.t(),
          sorted_set_name :: String.t(),
          elements :: %{binary() => number()} | [{binary(), number()}],
          collection_ttl :: CollectionTtl.t()
        ) :: Momento.Responses.SortedSet.PutElements.t()
  def sorted_set_put_elements(
        data_client,
        cache_name,
        sorted_set_name,
        elements,
        collection_ttl
      ) do
    with :ok <- validate_cache_name(cache_name),
         :ok <- validate_sorted_set_name(sorted_set_name),
         {:ok, ttl_milliseconds} <- get_ttl_milliseconds(collection_ttl),
         {:ok, transformed_elements} <- transform_sorted_set_elements(elements) do
      try do
        metadata = create_metadata(cache_name, data_client)

        sorted_set_put_request = %Momento.Protos.CacheClient.SortedSetPutRequest{
          set_name: sorted_set_name,
          elements: transformed_elements,
          ttl_milliseconds: ttl_milliseconds,
          refresh_ttl: collection_ttl.refresh_ttl
        }

        case Momento.Protos.CacheClient.Scs.Stub.sorted_set_put(
               data_client.channel,
               sorted_set_put_request,
               metadata: metadata
             ) do
          {:ok, _} -> {:ok, %Momento.Responses.SortedSet.PutElements.Ok{}}
          {:error, error_response} -> {:error, Momento.Error.convert(error_response)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec transform_sorted_set_elements(
          elements :: %{binary() => number()} | [{binary(), number()}]
        ) :: {:ok, [%Momento.Protos.CacheClient.SortedSetElement{}]} | {:error, Momento.Error.t()}
  defp transform_sorted_set_elements(elements) when is_map(elements) or is_list(elements) do
    try do
      transformed_elements =
        Enum.map(elements, fn {value, score} ->
          %Momento.Protos.CacheClient.SortedSetElement{
            value: value,
            score: score
          }
        end)

      {:ok, transformed_elements}
    catch
      _type, _reason ->
        {:error,
         Momento.Error.invalid_argument(
           "elements must be a list of {binary, number} tuples or a map of binary to number"
         )}
    end
  end

  defp transform_sorted_set_elements(_elements),
    do: {:error, Momento.Error.invalid_argument("elements must be a map or a list")}

  @spec sorted_set_fetch_by_rank(
          data_client :: t(),
          cache_name :: String.t(),
          sorted_set_name :: String.t(),
          start_rank :: integer() | nil,
          end_rank :: integer() | nil,
          sort_order :: :asc | :desc
        ) :: Momento.Responses.SortedSet.Fetch.t()
  def sorted_set_fetch_by_rank(
        data_client,
        cache_name,
        sorted_set_name,
        start_rank \\ nil,
        end_rank \\ nil,
        sort_order \\ :asc
      ) do
    with :ok <- validate_cache_name(cache_name),
         :ok <- validate_sorted_set_name(sorted_set_name) do
      try do
        metadata = create_metadata(cache_name, data_client)

        start_index =
          case start_rank do
            nil -> {:unbounded_start, %Momento.Protos.CacheClient.Unbounded{}}
            _ -> {:inclusive_start_index, start_rank}
          end

        end_index =
          case end_rank do
            nil -> {:unbounded_end, %Momento.Protos.CacheClient.Unbounded{}}
            _ -> {:exclusive_end_index, end_rank}
          end

        order =
          case sort_order do
            :asc -> 0
            _ -> 1
          end

        fetch_request = %Momento.Protos.CacheClient.SortedSetFetchRequest{
          set_name: sorted_set_name,
          order: order,
          with_scores: true,
          range:
            {:by_index,
             %Momento.Protos.CacheClient.SortedSetFetchRequest.ByIndex{
               start: start_index,
               end: end_index
             }}
        }

        case Momento.Protos.CacheClient.Scs.Stub.sorted_set_fetch(
               data_client.channel,
               fetch_request,
               metadata: metadata
             ) do
          {:ok, response} ->
            case response.sorted_set do
              {:found, found} ->
                {:values_with_scores, values_with_scores} = found.elements

                scored_values =
                  Enum.map(values_with_scores.elements, fn element ->
                    {element.value, element.score}
                  end)

                {:ok, %Momento.Responses.SortedSet.Fetch.Hit{value: scored_values}}

              {:missing, _} ->
                :miss
            end

          {:error, error_response} ->
            {:error, Momento.Error.convert(error_response)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec sorted_set_fetch_by_score(
          data_client :: t(),
          cache_name :: String.t(),
          sorted_set_name :: String.t(),
          min_score :: number() | nil,
          max_score :: number() | nil,
          offset :: integer() | nil,
          count :: integer() | nil,
          sort_order :: :asc | :desc
        ) :: Momento.Responses.SortedSet.Fetch.t()
  def sorted_set_fetch_by_score(
        data_client,
        cache_name,
        sorted_set_name,
        min_score,
        max_score,
        offset,
        count,
        sort_order
      ) do
    with :ok <- validate_cache_name(cache_name),
         :ok <- validate_sorted_set_name(sorted_set_name) do
      try do
        metadata = create_metadata(cache_name, data_client)

        request_min_score =
          case min_score do
            nil ->
              {:unbounded_min, %Momento.Protos.CacheClient.Unbounded{}}

            _ ->
              {:min_score,
               %Momento.Protos.CacheClient.SortedSetFetchRequest.ByScore.Score{
                 score: min_score,
                 exclusive: false
               }}
          end

        request_max_score =
          case max_score do
            nil ->
              {:unbounded_max, %Momento.Protos.CacheClient.Unbounded{}}

            _ ->
              {:max_score,
               %Momento.Protos.CacheClient.SortedSetFetchRequest.ByScore.Score{
                 score: max_score,
                 exclusive: false
               }}
          end

        request_offset =
          case offset do
            nil -> 0
            _ -> offset
          end

        request_count =
          case count do
            nil -> -1
            _ -> count
          end

        request_order =
          case sort_order do
            :asc -> 0
            _ -> 1
          end

        fetch_request = %Momento.Protos.CacheClient.SortedSetFetchRequest{
          set_name: sorted_set_name,
          order: request_order,
          with_scores: true,
          range:
            {:by_score,
             %Momento.Protos.CacheClient.SortedSetFetchRequest.ByScore{
               min: request_min_score,
               max: request_max_score,
               offset: request_offset,
               count: request_count
             }}
        }

        case Momento.Protos.CacheClient.Scs.Stub.sorted_set_fetch(
               data_client.channel,
               fetch_request,
               metadata: metadata
             ) do
          {:ok, response} ->
            case response.sorted_set do
              {:found, found} ->
                {:values_with_scores, values_with_scores} = found.elements

                scored_values =
                  Enum.map(values_with_scores.elements, fn element ->
                    {element.value, element.score}
                  end)

                {:ok, %Momento.Responses.SortedSet.Fetch.Hit{value: scored_values}}

              {:missing, _} ->
                :miss
            end

          {:error, error_response} ->
            {:error, Momento.Error.convert(error_response)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec sorted_set_remove_elements(
          data_client :: t(),
          cache_name :: String.t(),
          sorted_set_name :: String.t(),
          values :: [binary()]
        ) :: Momento.Responses.SortedSet.RemoveElements.t()
  def sorted_set_remove_elements(
        data_client,
        cache_name,
        sorted_set_name,
        values
      ) do
    with :ok <- validate_cache_name(cache_name),
         :ok <- validate_sorted_set_name(sorted_set_name) do
      try do
        metadata = create_metadata(cache_name, data_client)

        remove_request = %Momento.Protos.CacheClient.SortedSetRemoveRequest{
          set_name: sorted_set_name,
          remove_elements:
            {:some, %Momento.Protos.CacheClient.SortedSetRemoveRequest.Some{values: values}}
        }

        case Momento.Protos.CacheClient.Scs.Stub.sorted_set_remove(
               data_client.channel,
               remove_request,
               metadata: metadata
             ) do
          {:ok, _} ->
            {:ok, %Momento.Responses.SortedSet.RemoveElements.Ok{}}

          {:error, error} ->
            {:error, Momento.Error.convert(error)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec sorted_set_get_rank(
          data_client :: t(),
          cache_name :: String.t(),
          sorted_set_name :: String.t(),
          value :: binary(),
          sort_order :: :asc | :desc
        ) :: Momento.Responses.SortedSet.GetRank.t()
  def sorted_set_get_rank(
        data_client,
        cache_name,
        sorted_set_name,
        value,
        sort_order
      ) do
    with :ok <- validate_cache_name(cache_name),
         :ok <- validate_sorted_set_name(sorted_set_name) do
      try do
        metadata = create_metadata(cache_name, data_client)

        request_order =
          case sort_order do
            :asc -> 0
            _ -> 1
          end

        get_rank_request = %Momento.Protos.CacheClient.SortedSetGetRankRequest{
          set_name: sorted_set_name,
          value: value,
          order: request_order
        }

        case Momento.Protos.CacheClient.Scs.Stub.sorted_set_get_rank(
               data_client.channel,
               get_rank_request,
               metadata: metadata
             ) do
          {:ok, response} ->
            case response do
              %Momento.Protos.CacheClient.SortedSetGetRankResponse{
                rank:
                  {:element_rank,
                   %Momento.Protos.CacheClient.SortedSetGetRankResponse.RankResponsePart{
                     result: :Hit,
                     rank: rank
                   }}
              } ->
                {:ok, %Momento.Responses.SortedSet.GetRank.Hit{rank: rank}}

              %Momento.Protos.CacheClient.SortedSetGetRankResponse{
                rank:
                  {:element_rank,
                   %Momento.Protos.CacheClient.SortedSetGetRankResponse.RankResponsePart{
                     result: :Miss
                   }}
              } ->
                :miss

              %Momento.Protos.CacheClient.SortedSetGetRankResponse{
                rank:
                  {:missing,
                   %Momento.Protos.CacheClient.SortedSetGetRankResponse.SortedSetMissing{}}
              } ->
                :miss
            end

          {:error, error} ->
            {:error, Momento.Error.convert(error)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec sorted_set_get_score(
          data_client :: t(),
          cache_name :: String.t(),
          sorted_set_name :: String.t(),
          value :: binary()
        ) :: Momento.Responses.SortedSet.GetScore.t()
  def sorted_set_get_score(
        data_client,
        cache_name,
        sorted_set_name,
        value
      ) do
    try do
      case sorted_set_get_scores(
             data_client,
             cache_name,
             sorted_set_name,
             [value]
           ) do
        {:ok, %Momento.Responses.SortedSet.GetScores.Hit{value: values}} ->
          case values do
            [] ->
              :miss

            _ ->
              case Enum.find(values, fn {key, _} -> key == value end) do
                nil ->
                  :miss

                {_, score} ->
                  case score do
                    nil -> :miss
                    _ -> {:ok, %Momento.Responses.SortedSet.GetScore.Hit{score: score}}
                  end
              end
          end

        :miss ->
          :miss

        {:error, error} ->
          {:error, error}
      end
    rescue
      e -> {:error, Momento.Error.convert(e)}
    end
  end

  @spec sorted_set_get_scores(
          data_client :: t(),
          cache_name :: String.t(),
          sorted_set_name :: String.t(),
          values :: [binary()]
        ) :: Momento.Responses.SortedSet.GetScores.t()
  def sorted_set_get_scores(
        data_client,
        cache_name,
        sorted_set_name,
        values
      ) do
    with :ok <- validate_cache_name(cache_name),
         :ok <- validate_sorted_set_name(sorted_set_name) do
      try do
        metadata = create_metadata(cache_name, data_client)

        get_scores_request = %Momento.Protos.CacheClient.SortedSetGetScoreRequest{
          set_name: sorted_set_name,
          values: values
        }

        case Momento.Protos.CacheClient.Scs.Stub.sorted_set_get_score(
               data_client.channel,
               get_scores_request,
               metadata: metadata
             ) do
          {:ok, response} ->
            case response do
              %Momento.Protos.CacheClient.SortedSetGetScoreResponse{
                sorted_set:
                  {:found,
                   %Momento.Protos.CacheClient.SortedSetGetScoreResponse.SortedSetFound{
                     elements: elements
                   }}
              } ->
                values_to_scores =
                  Enum.zip(values, elements)
                  |> Enum.map(fn {value, element} ->
                    case element.result do
                      :Hit -> {value, element.score}
                      :Miss -> {value, nil}
                    end
                  end)

                {:ok, %Momento.Responses.SortedSet.GetScores.Hit{value: values_to_scores}}

              %Momento.Protos.CacheClient.SortedSetGetScoreResponse{
                sorted_set:
                  {:missing,
                   %Momento.Protos.CacheClient.SortedSetGetScoreResponse.SortedSetMissing{}}
              } ->
                :miss
            end

          {:error, error} ->
            {:error, Momento.Error.convert(error)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec sorted_set_increment_score(
          data_client :: t(),
          cache_name :: String.t(),
          sorted_set_name :: String.t(),
          value :: binary(),
          amount :: number(),
          collection_ttl :: CollectionTtl.t()
        ) :: Momento.Responses.SortedSet.IncrementScore.t()
  def sorted_set_increment_score(
        data_client,
        cache_name,
        sorted_set_name,
        value,
        amount,
        collection_ttl
      ) do
    with :ok <- validate_cache_name(cache_name),
         :ok <- validate_sorted_set_name(sorted_set_name),
         {:ok, ttl_milliseconds} <- get_ttl_milliseconds(collection_ttl) do
      try do
        metadata = create_metadata(cache_name, data_client)

        increment_request = %Momento.Protos.CacheClient.SortedSetIncrementRequest{
          set_name: sorted_set_name,
          value: value,
          amount: amount,
          ttl_milliseconds: ttl_milliseconds,
          refresh_ttl: collection_ttl.refresh_ttl
        }

        case Momento.Protos.CacheClient.Scs.Stub.sorted_set_increment(
               data_client.channel,
               increment_request,
               metadata: metadata
             ) do
          {:ok, response} ->
            {:ok, %Momento.Responses.SortedSet.IncrementScore.Ok{score: response.score}}

          {:error, error} ->
            {:error, Momento.Error.convert(error)}
        end
      rescue
        e -> {:error, Momento.Error.convert(e)}
      end
    else
      error -> error
    end
  end

  @spec get_ttl_milliseconds(ttl :: number() | CollectionTtl.t() | nil) ::
          {:ok, number()} | {:error, Momento.Error.t()}
  defp get_ttl_milliseconds(nil),
    do: {:ok, nil}

  defp get_ttl_milliseconds(ttl) when is_number(ttl),
    do: {:ok, ttl |> Kernel.*(1000) |> round()}

  defp get_ttl_milliseconds(%{ttl_seconds: ttl}) when is_number(ttl),
    do: {:ok, ttl |> Kernel.*(1000) |> round()}

  defp get_ttl_milliseconds(%{ttl_seconds: nil}),
    do: {:ok, nil}

  defp get_ttl_milliseconds(ttl),
    do: {:error, Momento.Error.invalid_argument("Unable to parse TTL from #{ttl}")}

  @agent_data_key "__#{__MODULE__}_AGENT_DATA_SENT__"

  defp should_send_agent_data? do
    :erlang.get(@agent_data_key) == :undefined
  end

  @spec create_metadata(String.t(), t()) :: %{required(String.t()) => String.t()}
  defp create_metadata(cache_name, data_client) do
    base_metadata = %{
      "cache" => cache_name,
      "authorization" => data_client.auth_token
    }

    if should_send_agent_data?() do
      :erlang.put(@agent_data_key, true)

      IO.puts("Versions:")
      IO.puts(get_library_version())
      IO.puts(System.version())

      Map.merge(base_metadata, %{
        # example agent: "elixir:cache:0.6.6"
        "agent" => "elixir:cache:" <> get_library_version(),
        # example runtime-version: "1.16.2"
        "runtime-version" => System.version()
      })
    else
      base_metadata
    end
  end

  @spec get_library_version() :: String.t()
  defp get_library_version do
    case Application.spec(:gomomento, :vsn) do
      nil -> "unknown"
      version -> to_string(version)
    end
  end
end