defmodule AltworxBook do
  @moduledoc """
  AltworxBook provides function to interact with Altworx from LiveBook notebooks.

  There is support for:
  * Normalizer pipeline development and debugging

  ## Basic usage
  Currently notebook needs to be running on the same machine, as Altworx. Connecting to
  running Altworx is done via `AltworxBook.connect_to_altworx/0`. Library uses env variable
  `ALTWORX_BOOK_ENV` to infer Altworx node name, default value is `prod`. In case LiveBook
  is started in local dev environment with host docker network, env var `ALTWORX_BOOK_ENV`
  should be set to `dev`.

  When connection is made, you can call other general purpose functions such as listing topics 
  via `AltworxBook.list_topics/0` or only raw topics via `AltworxBook.list_raw_topics/0`.
  Raw messages can be accessed by calling `AltworxBook.list_raw_messages/4`. A specific 
  message can be acquired with `AltworxBook.get_raw_message/2`.

  ## Normalizer pipeline development and debugging
  Whole pipeline can be executed in a notebook using `AltworxBook.run_pipeline/2` on a list of raw
  messages. Messages can be defined manually or retrieved from an Altworx instance. Pipeline
  definition is similar to the configuration in `config.ini`. The only difference is that
  normalization functions are passed as reference, e.g.

  Normalization results can be displayed as VegaLite graphs in Kino layout by calling

  You can also debug pipelines by function via `AltworxBook.run_pipeline_step/2` which
  executes a single normalization function. This is useful in combination with printing out
  intermediate results using `IO.inspect/2` or `dbg/0`.

  alias AltworxBook.Altworx
  alias AltworxBook.Kafka
  alias AltworxBook.Kafka.RawTopic
  alias AltworxBook.Normalizer.Pipeline
  alias Toolbox.Message

  @doc """
  Connect to running altworx node.

  Altworx node is expected to be running on the same machine as caller process.
  @spec connect_to_altworx :: :ok | {:error, term}
  def connect_to_altworx, do: Altworx.connect()

  @doc """
  List all topics altworx has access to.

  ## Examples
      iex> AltworxBook.list_topics()
      ["N6_rt_reality_network_updates", "reality_network_updates"]
  @spec list_topics :: [String.t()]
  def list_topics, do: Kafka.list_topics()

  @doc """
  List all raw topics Altworx has access to.

  Raw topics contain raw messages received by Altworx Acceptors for Normalizer pipeline to process
  and transform them into Runtime messages.

  ## Examples
      iex> AltworxBook.list_raw_topics()
  @spec list_raw_topics :: [String.t()]
  def list_raw_topics, do: RawTopic.list()

  @doc """
  Returns list of messages from kafka raw topic.

  ## Args
   * `topic_name` - name of kafka topic
   * `since` - timestamp in ms from which will messages be returned
   * `count` - count of the messages to return
   * `options` (optional)
     * `:json` - transform encoded JSON value to elixir term
     * `:just_value` - return only value without envelope with metadata

  ## Examples
      iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1)
      [%{key: "", offset: 0, timestamp: 1638430188202, value: "{\"timestamp\": 1638430188202, \"type\": \"upsert_asset\"}"]

      iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:just_value])
      ["{\"timestamp\": 1638430188202, \"type\": \"upsert_asset\"}"]

      iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:json])
      [%{key: "", offset: 0, timestamp: 1638430188202, value: %{timestamp: 1638430188202, type: "usert_asset"}]
  @spec list_raw_messages(
          topic_name :: String.t(),
          since :: Altworx.timestamp(),
          count :: integer,
          options :: [:json | :just_value]
        ) ::
              key: String.t(),
              offset: integer,
              timestamp: Altworx.timestamp(),
              value: binary() | any()
          | [binary() | any()]
  def list_raw_messages(topic_name, since, count, options \\ []),
    do: RawTopic.list_messages(topic_name, since, count, options)

  @doc """
  Return message from raw topic by its `offset`.

  ## Examples
      iex> AltworxBook.get_raw_message("reality_network_updates", 0)
      [%{timestamp: 1638430188202, value: "{\"timestamp\": 1638430188202, \"type\": \"upsert_asset\"}"}]
  @spec get_raw_message(topic_name :: String.t(), offset :: integer) ::
          {:ok, %{timestamp: Altworx.timestamp(), value: String.t()}} | {:error, term}
  def get_raw_message(topic_name, offset), do: RawTopic.get_message(topic_name, offset)

  @doc """
  Execute pipeline definition on list of messages.

  ## Args
   * `messages` - list of raw topic messages
   * `steps` - list of pipeline steps passed as function reference

  ## Examples
      iex> messages = [%{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}]
      ... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
      ... AltworxBook.run_pipeline(messages, pipeline)
      [{:new, %Toolbox.Message{type: "a", timestamp: 1694680405146, body: %{"timestamp" => 12345}}}]
  @spec run_pipeline([RawTopic.message()], [Pipeline.pipeline_step()]) :: [
  def run_pipeline(messages, steps), do:, steps)

  @doc """
  Evaluate pipeline step on given message.

  ## Args
   * `message` - raw topic message
   * `step` - pipeline step passed as function reference

  ## Examples
      iex> message = %{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}
      ... AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
      %Toolbox.Message{type: nil, timestamp: 1694680405146, body: %{"timestamp" => 12345}}
  @spec run_pipeline_step(
          RawTopic.message() | {:error, term} | :ignored | Message.t(),
        ) :: {:error, term} | :ignored | Message.t()
  def run_pipeline_step(message, step_fn), do: Pipeline.run_step(message, step_fn)

  @doc """
  Validate pipeline output message produced by evaluating pipeline steps one by one.

  There is validation baked into runing whole pipeline via `AltworxBook.run_pipeline/2`,
  we need to call this function to check that all required fields have valid values.

  ## Args
   * `message` - pipeline output message

  ## Examples
      iex> message = %{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}
      ... otput_message = AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
      ... AltworxBook.validate_pipeline_step_output_message(output_message)
  @spec validate_pipeline_step_output_message(Message.t() | term) :: :ok | {:error, term}
  def validate_pipeline_step_output_message(message),
    do: Pipeline.validate_output_message(message)

  @doc """
  Utility function to display results of `AltworxBook.run_pipeline/2`.

  Call this function from your notebook on outputs of `AltworxBook.run_pipeline/2` and
  kino presentation will be displayed.

  ## Examples
      iex> messages = [%{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}]
      ... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
      ... results = AltworxBook.run_pipeline(messages, pipeline)
      ... AltworxBook.visualize_pipeline_results(results)
  def visualize_pipeline_results(results), do: Pipeline.visualize_results(results)