defmodule AltworxBook do
@moduledoc """
AltworxBook provides function to interact with Altworx from LiveBook notebooks.
There is support for:
* Normalizer pipeline development and debugging
## Basic usage
Currently notebook needs to be running on the same machine, as Altworx. Connecting to
running Altworx is done via `AltworxBook.connect_to_altworx/0`. Library uses env variable
`ALTWORX_BOOK_ENV` to infer Altworx node name, default value is `prod`. In case LiveBook
is started in local dev environment with host docker network, env var `ALTWORX_BOOK_ENV`
should be set to `dev`.
When connection is made, you can call other general purpose functions such as listing topics
via `AltworxBook.list_topics/0` or only raw topics via `AltworxBook.list_raw_topics/0`.
Raw messages can be accessed by calling `AltworxBook.list_raw_messages/4`. A specific
message can be acquired with `AltworxBook.get_raw_message/2`.
## Normalizer pipeline development and debugging
Whole pipeline can be executed in a notebook using `AltworxBook.run_pipeline/2` on a list of raw
messages. Messages can be defined manually or retrieved from an Altworx instance. Pipeline
definition is similar to the configuration in `config.ini`. The only difference is that
normalization functions are passed as reference, e.g.
`Normalizer.Basic.decode`->`&Normalizer.Basic.decode/1`).
Normalization results can be displayed as VegaLite graphs in Kino layout by calling
`AltworxBook.visualize_pipeline_results/1`.
You can also debug pipelines by function via `AltworxBook.run_pipeline_step/2` which
executes a single normalization function. This is useful in combination with printing out
intermediate results using `IO.inspect/2` or `dbg/0`.
"""
alias AltworxBook.Altworx
alias AltworxBook.Kafka
alias AltworxBook.Kafka.RawTopic
alias AltworxBook.Normalizer.Pipeline
alias Toolbox.Message
@doc """
Connect to running altworx node.
Altworx node is expected to be running on the same machine as caller process.
"""
@spec connect_to_altworx :: :ok | {:error, term}
def connect_to_altworx, do: Altworx.connect()
@doc """
List all topics altworx has access to.
## Examples
iex> AltworxBook.list_topics()
["N6_rt_reality_network_updates", "reality_network_updates"]
"""
@spec list_topics :: [String.t()]
def list_topics, do: Kafka.list_topics()
@doc """
List all raw topics Altworx has access to.
Raw topics contain raw messages received by Altworx Acceptors for Normalizer pipeline to process
and transform them into Runtime messages.
## Examples
iex> AltworxBook.list_raw_topics()
["reality_network_updates"]
"""
@spec list_raw_topics :: [String.t()]
def list_raw_topics, do: RawTopic.list()
@doc """
Returns list of messages from kafka raw topic.
## Args
* `topic_name` - name of kafka topic
* `since` - timestamp in ms from which will messages be returned
* `count` - count of the messages to return
* `options` (optional)
* `:json` - transform encoded JSON value to elixir term
* `:just_value` - return only value without envelope with metadata
## Examples
iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1)
[%{key: "", offset: 0, timestamp: 1638430188202, value: "{\"timestamp\": 1638430188202, \"type\": \"upsert_asset\"}"]
iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:just_value])
["{\"timestamp\": 1638430188202, \"type\": \"upsert_asset\"}"]
iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:json])
[%{key: "", offset: 0, timestamp: 1638430188202, value: %{timestamp: 1638430188202, type: "usert_asset"}]
"""
@spec list_raw_messages(
topic_name :: String.t(),
since :: Altworx.timestamp(),
count :: integer,
options :: [:json | :just_value]
) ::
[
%{
key: String.t(),
offset: integer,
timestamp: Altworx.timestamp(),
value: binary() | any()
}
]
| [binary() | any()]
def list_raw_messages(topic_name, since, count, options \\ []),
do: RawTopic.list_messages(topic_name, since, count, options)
@doc """
Return message from raw topic by its `offset`.
## Examples
iex> AltworxBook.get_raw_message("reality_network_updates", 0)
[%{timestamp: 1638430188202, value: "{\"timestamp\": 1638430188202, \"type\": \"upsert_asset\"}"}]
"""
@spec get_raw_message(topic_name :: String.t(), offset :: integer) ::
{:ok, %{timestamp: Altworx.timestamp(), value: String.t()}} | {:error, term}
def get_raw_message(topic_name, offset), do: RawTopic.get_message(topic_name, offset)
@doc """
Execute pipeline definition on list of messages.
## Args
* `messages` - list of raw topic messages
* `steps` - list of pipeline steps passed as function reference
## Examples
iex> messages = [%{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}]
... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
... AltworxBook.run_pipeline(messages, pipeline)
[{:new, %Toolbox.Message{type: "a", timestamp: 1694680405146, body: %{"timestamp" => 12345}}}]
"""
@spec run_pipeline([RawTopic.message()], [Pipeline.pipeline_step()]) :: [
Pipeline.pipeline_step_result()
]
def run_pipeline(messages, steps), do: Pipeline.run(messages, steps)
@doc """
Evaluate pipeline step on given message.
## Args
* `message` - raw topic message
* `step` - pipeline step passed as function reference
## Examples
iex> message = %{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}
... AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
%Toolbox.Message{type: nil, timestamp: 1694680405146, body: %{"timestamp" => 12345}}
"""
@spec run_pipeline_step(
RawTopic.message() | {:error, term} | :ignored | Message.t(),
Pipeline.pipeline_step()
) :: {:error, term} | :ignored | Message.t()
def run_pipeline_step(message, step_fn), do: Pipeline.run_step(message, step_fn)
@doc """
Validate pipeline output message produced by evaluating pipeline steps one by one.
There is validation baked into runing whole pipeline via `AltworxBook.run_pipeline/2`,
we need to call this function to check that all required fields have valid values.
## Args
* `message` - pipeline output message
## Examples
iex> message = %{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}
... otput_message = AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
... AltworxBook.validate_pipeline_step_output_message(output_message)
:ok
"""
@spec validate_pipeline_step_output_message(Message.t() | term) :: :ok | {:error, term}
def validate_pipeline_step_output_message(message),
do: Pipeline.validate_output_message(message)
@doc """
Utility function to display results of `AltworxBook.run_pipeline/2`.
Call this function from your notebook on outputs of `AltworxBook.run_pipeline/2` and
kino presentation will be displayed.
## Examples
iex> messages = [%{timestamp: 1694680405146, value: "{\"timestamp\": 12345}"}]
... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
... results = AltworxBook.run_pipeline(messages, pipeline)
... AltworxBook.visualize_pipeline_results(results)
"""
def visualize_pipeline_results(results), do: Pipeline.visualize_results(results)
end