lib/altworx_book.ex

defmodule AltworxBook do
  @moduledoc """
  AltworxBook provides function to interact with Altworx from LiveBook notebooks.

  There is support for:
  * Normalizer pipeline development and debugging

  ## Basic usage
  Currently notebook needs to be running on the same machine, as Altworx. Connecting to
  running Altworx is done via `AltworxBook.connect_to_altworx/0`. Library uses env variable
  `ALTWORX_BOOK_ENV` to infer Altworx node name, default value is `prod`. In case LiveBook
  is started in local dev environment with host docker network, env var `ALTWORX_BOOK_ENV`
  should be set to `dev`.

  When connection is made, you can call other general purpose functions such as listing topics 
  via `AltworxBook.list_topics/0` or only raw topics via `AltworxBook.list_raw_topics/0`.
  Raw messages can be accessed by calling `AltworxBook.list_raw_messages/4`. A specific 
  message can be acquired with `AltworxBook.get_raw_message/2`.

  ## Normalizer pipeline development and debugging
  Whole pipeline can be executed in a notebook using `AltworxBook.run_pipeline/2` on a list of raw
  messages. Messages can be defined manually or retrieved from an Altworx instance. Pipeline
  definition is similar to the configuration in `config.ini`. The only difference is that
  normalization functions are passed as reference, e.g.
  `Normalizer.Basic.decode`->`&Normalizer.Basic.decode/1`).

  Normalization results can be displayed as VegaLite graphs in Kino layout by calling
  `AltworxBook.visualize_pipeline_results/1`.

  You can also debug pipelines by function via `AltworxBook.run_pipeline_step/2` which
  executes a single normalization function. This is useful in combination with printing out
  intermediate results using `IO.inspect/2` or `dbg/0`.
  """

  alias AltworxBook.Altworx
  alias AltworxBook.Kafka
  alias AltworxBook.Kafka.RawTopic
  alias AltworxBook.Normalizer.Pipeline
  alias Toolbox.Message

  @doc """
  Connect to running altworx node.

  Altworx node is expected to be running on the same machine as caller process.
  """
  @spec connect_to_altworx :: :ok | {:error, term}
  def connect_to_altworx, do: Altworx.connect()

  @doc """
  List all topics altworx has access to.

  ## Examples
      iex> AltworxBook.list_topics()
      ["N6_rt_reality_network_updates", "reality_network_updates"]
  """
  @spec list_topics :: [String.t()]
  def list_topics, do: Kafka.list_topics()

  @doc """
  List all raw topics Altworx has access to.

  Raw topics contain raw messages received by Altworx Acceptors for Normalizer pipeline to process
  and transform them into Runtime messages.

  ## Examples
      iex> AltworxBook.list_raw_topics()
      ["reality_network_updates"]
  """
  @spec list_raw_topics :: [String.t()]
  def list_raw_topics, do: RawTopic.list()

  @doc """
  Returns list of messages from kafka raw topic.

  ## Args
   * `topic_name` - name of kafka topic
   * `since` - timestamp in ms from which will messages be returned
   * `count` - count of the messages to return
   * `options` (optional)
     * `:json` - transform encoded JSON value to elixir term
     * `:just_value` - return only value without envelope with metadata

  ## Examples
      iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1)
      [%{key: "", offset: 0, timestamp: 1638430188202, value: "{\"timestamp\": 1638430188202, \"type\": \"upsert_asset\"}"]

      iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:just_value])
      ["{\"timestamp\": 1638430188202, \"type\": \"upsert_asset\"}"]

      iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:json])
      [%{key: "", offset: 0, timestamp: 1638430188202, value: %{timestamp: 1638430188202, type: "usert_asset"}]
  """
  @spec list_raw_messages(
          topic_name :: String.t(),
          since :: Altworx.timestamp(),
          count :: integer,
          options :: [:json | :just_value]
        ) ::
          [
            %{
              key: String.t(),
              offset: integer,
              timestamp: Altworx.timestamp(),
              value: binary() | any()
            }
          ]
          | [binary() | any()]
  def list_raw_messages(topic_name, since, count, options \\ []),
    do: RawTopic.list_messages(topic_name, since, count, options)

  @doc """
  Return message from raw topic by its `offset`.

  ## Examples
      iex> AltworxBook.get_raw_message("reality_network_updates", 0)
      [%{timestamp: 1638430188202, value: "{\"timestamp\": 1638430188202, \"type\": \"upsert_asset\"}"}]
  """
  @spec get_raw_message(topic_name :: String.t(), offset :: integer) ::
          {:ok, %{timestamp: Altworx.timestamp(), value: String.t()}} | {:error, term}
  def get_raw_message(topic_name, offset), do: RawTopic.get_message(topic_name, offset)

  @doc """
  Execute pipeline definition on list of messages.

  ## Args
   * `messages` - list of raw topic messages
   * `steps` - list of pipeline steps passed as function reference

  ## Examples
      iex> messages = [%{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}]
      ... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
      ... AltworxBook.run_pipeline(messages, pipeline)
      [{:new, %Toolbox.Message{type: "a", timestamp: 1694680405146, body: %{"timestamp" => 12345}}}]
  """
  @spec run_pipeline([RawTopic.message()], [Pipeline.pipeline_step()]) :: [
          Pipeline.pipeline_step_result()
        ]
  def run_pipeline(messages, steps), do: Pipeline.run(messages, steps)

  @doc """
  Evaluate pipeline step on given message.

  ## Args
   * `message` - raw topic message
   * `step` - pipeline step passed as function reference

  ## Examples
      iex> message = %{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}
      ... AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
      %Toolbox.Message{type: nil, timestamp: 1694680405146, body: %{"timestamp" => 12345}}
  """
  @spec run_pipeline_step(
          RawTopic.message() | {:error, term} | :ignored | Message.t(),
          Pipeline.pipeline_step()
        ) :: {:error, term} | :ignored | Message.t()
  def run_pipeline_step(message, step_fn), do: Pipeline.run_step(message, step_fn)

  @doc """
  Validate pipeline output message produced by evaluating pipeline steps one by one.

  There is validation baked into runing whole pipeline via `AltworxBook.run_pipeline/2`,
  we need to call this function to check that all required fields have valid values.

  ## Args
   * `message` - pipeline output message

  ## Examples
      iex> message = %{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}
      ... otput_message = AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
      ... AltworxBook.validate_pipeline_step_output_message(output_message)
      :ok
  """
  @spec validate_pipeline_step_output_message(Message.t() | term) :: :ok | {:error, term}
  def validate_pipeline_step_output_message(message),
    do: Pipeline.validate_output_message(message)

  @doc """
  Utility function to display results of `AltworxBook.run_pipeline/2`.

  Call this function from your notebook on outputs of `AltworxBook.run_pipeline/2` and
  kino presentation will be displayed.

  ## Examples
      iex> messages = [%{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}]
      ... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
      ... results = AltworxBook.run_pipeline(messages, pipeline)
      ... AltworxBook.visualize_pipeline_results(results)
  """
  def visualize_pipeline_results(results), do: Pipeline.visualize_results(results)
end