lib/noizu_weaviate.ex

defmodule Noizu.Weaviate do
  @moduledoc """
  Noizu.Weaviate is a library providing a simple wrapper around Weaviate's API calls.
  It handles various API features such as meta information, batch operations, backups,
  schema operations, nodes information, data objects, and classification.

  ## Configuration

  To configure the library, you need to set the Weaviate API key in your application's configuration:

      config :noizu_weaviate,
        weaviate_api_key: "your_api_key_here"
  """

  require Logger
  # -------------------------------
  # Global Types
  # -------------------------------
  @type error_tuple :: {:error, details :: term}

  # option constraints
  @type consistency_level_option() :: String.t()

  # common types
  @type api_response() :: {:ok, map()} | {:error, any()}

  # Common Type
  @type options() :: map()

  @weaviate_base Application.compile_env(:noizu_weaviate, :endpoint, "http://api.weaviate.com/")

  require Finch

  def weaviate_base(), do: @weaviate_base
  def api_base(), do: @weaviate_base

  # -------------------------------
  #
  # -------------------------------
  def generic_stream_provider(callback) do
    fn event, payload ->
      case event do
        {:status, code} ->
          %{payload | status: code}

        {:headers, headers} ->
          %{payload | headers: headers}

        {:data, data} ->
          n =
            String.split(data, "\n\ndata:")
            |> Enum.map(fn data ->
              case Jason.decode(data, keys: :atoms) do
                {:ok, json} ->
                  case json do
                    %{:choices => [%{:delta => %{:content => c}, :finish_reason => _} | _]} -> c
                    _ -> nil
                  end

                _ ->
                  nil
              end
            end)
            |> Enum.filter(& &1)
            |> Enum.join("")

          payload = %{payload | message: payload.message <> n}
          # Call the provided callback function with the payload
          callback.(payload)

        _ ->
          payload
      end
    end
  end

  # -------------------------------
  #
  # -------------------------------
  @doc """
  A helper function to make API calls to the OpenAI API. This function handles both non-stream and stream API calls.

  ## Parameters

  - type: The HTTP request method (e.g., :get, :post, :put, :patch, :delete)
  - url: The full URL for the API endpoint
  - body: The request body in map format
  - model: The model to be used for the response processing
  - options
    - stream: A boolean value to indicate whether the request should be processed as a stream or not (default: false)
    - raw: return raw response
    - response_log_callback: function(finch) callback for request log.
    - response_log_callback: function(finch, start_ms) callback for response log.

  ## Returns

  Returns a tuple {:ok, response} on successful API call, where response is the decoded JSON response in map format.
  Returns {:error, term} on failure, where term contains error details.
  """
  def api_call(type, url, body, model, options \\ nil) do
    stream = options[:stream] || false
    raw = options[:raw] || false

    if stream do
      with {:ok, body} <- (body && Jason.encode(body)) || {:ok, nil},
           {:ok, r = %{status: 200, message: _}} <- api_call_stream(type, url, body, options) do
        {:ok, r}
        # apply(model, :from_json, [json])
      else
        error ->
          Logger.warn("STREAM API ERROR: \n #{inspect(error)}")
          error
      end
    else
      with {:ok, body} <- (body && Jason.encode(body)) || {:ok, nil},
           {:ok, %Finch.Response{status: 200, body: body}} <-
             api_call_fetch(type, url, body, options),
           {:ok, json} <- (!raw && ( (String.length(body) > 0) && Jason.decode(body, keys: :atoms) || {:ok, nil} )) || {:ok, body} do
        cond do
          model in [nil, :json] -> {:ok, json}
          raw -> {:ok, apply(model, :from_binary, [json])}
          :else -> {:ok, apply(model, :from_json, [json])}
        end
      else
        error ->
          Logger.warn("API ERROR: \n #{inspect(error)}")
          error
      end
    end
  end

  # -------------------------------
  #
  # -------------------------------
  def headers() do
    [
      {"Content-Type", "application/json"}
    ]
    |> then(fn headers ->
      headers
    end)
  end

  # -------------------------------
  #
  # -------------------------------
  def put_field(body, field, options, default \\ nil)

  def put_field(body, :stream, options, default) do
    flag = (options[:stream] && true) || default
    Map.put(body, :stream, flag)
  end

  def put_field(body, {field_alias, field}, options, default) do
    if v = options[field_alias] || options[field] || default do
      Map.put(body, field, v)
    else
      body
    end
  end

  def put_field(body, field, options, default) do
    if v = options[field] || default do
      Map.put(body, field, v)
    else
      body
    end
  end

  # -------------------------------
  #
  # -------------------------------
  defp api_call_fetch(type, url, body, options) do
    ts = :os.system_time(:millisecond)

    request =
      Finch.build(type, url, headers(), body)
      |> tap(fn finch ->
        case request_log_callback = options[:request_log_callback] do
          nil -> :nop
          v when is_function(v, 1) -> v.(finch)
          {m, f} -> apply(m, f, [finch])
          _ -> :nop
        end
      end)

    # |> IO.inspect(label: "API_CALL_FETCH", limit: :infinity, printable_limit: :infinity, pretty: true)
    request
    |> Finch.request(Noizu.Weaviate.Finch,
      pool_timeout: 600_000,
      receive_timeout: 600_000,
      request_timeout: 600_000
    )
    |> tap(fn finch ->
      case response_log_callback = options[:response_log_callback] do
        nil -> :nop
        v when is_function(v, 3) -> v.(finch, request, ts)
        {m, f} -> apply(m, f, [finch, request, ts])
        _ -> :nop
      end
    end)
  end

  # -------------------------------
  #
  # -------------------------------
  defp api_call_stream(type, url, body, options) do
    callback = options[:stream]
    raw = options[:raw]
    ts = :os.system_time(:millisecond)

    request =
      Finch.build(type, url, headers(), body)
      |> tap(fn finch ->
        case request_log_callback = options[:request_log_callback] do
          nil -> :nop
          v when is_function(v, 1) -> v.(finch)
          {m, f} -> apply(m, f, [finch])
          _ -> :nop
        end
      end)

    request
    |> Finch.stream(Noizu.Weaviate.Finch, %{status: nil, raw: raw, message: ""}, callback,
      timeout: 600_000,
      receive_timeout: 600_000
    )
    |> tap(fn finch ->
      case response_log_callback = options[:response_log_callback] do
        nil -> :nop
        v when is_function(v, 3) -> v.(finch, request, ts)
        {m, f} -> apply(m, f, [finch, request, ts])
        _ -> :nop
      end
    end)
  end
end