defmodule Runbox.Runtime.Stage.UnitRegistry.RoutingKeyBuilder do
@moduledoc """
Builder of msg_parser and unit_register fns used in `Runbox.Runtime.Stage.UnitRegistry`
- message_parser is lambda function used to parse `Toolbox.Message` and creates key to find unit
in unit registry
- unit_register is lambda function used to parse `Toolbox.Runtime.Stage.Unit` and creates key
to store unit in unit registry
"""
alias Runbox.Runtime.Stage.UnitRegistry, as: UR
alias Runbox.Runtime.Stage.UnitRegistry.AlternativeRegistry
alias Runbox.Utils.Map, as: MapUtils
alias Toolbox.Message, as: Msg
alias Toolbox.Runtime.Stage.Unit, as: Unit
@doc """
Gets routing key definition as argument and returns msg_parser lambda function
Lambda takes `Toolbox.Message` as argument and returns list of message body values based on
provided routing key definition.
iex> msg_parser_fn =
...> RoutingKeyBuilder.build_msg_parser({:=, [[:foo]], [[:bar]]})
...> msg_parser_fn.(%Message{body: %{foo: "message_body_key_value"}})
["message_body_key_value"]
iex> msg_parser_fn =
...> RoutingKeyBuilder.build_msg_parser({:in, [[:foo]], [[:bar]]})
...> msg_parser_fn.(%Message{body: %{foo: "message_body_key_value"}})
["message_body_key_value"]
"""
@spec build_msg_parser(UR.routing_key_def()) :: UR.msg_parser()
def build_msg_parser({:=, msg_body_paths, _}) do
fn %Msg{} = msg -> Enum.map(msg_body_paths, &MapUtils.get_path(msg.body, &1)) end
end
def build_msg_parser({:in, [msg_body_path], _}) do
fn %Msg{} = msg -> [MapUtils.get_path(msg.body, msg_body_path)] end
end
def build_msg_parser({:in, [msg_body_path1, msg_body_path2], _}) do
fn %Msg{} = msg ->
[MapUtils.get_path(msg.body, msg_body_path1), MapUtils.get_path(msg.body, msg_body_path2)]
end
end
@doc """
Gets routing key definition as argument and returns unit_register lambda function
Lambda takes alternative registers (unit indexes), alternative register name,
`Toolbox.Runtime.Stage.Unit` as arguments and returns updated alternative registers based on
provided routing key definition.
iex> unit_register_fn =
...> RoutingKeyBuilder.build_unit_register({:=, [[:foo]], [[:bar]]})
...> unit = %Unit{id: "unit_id", attributes: %{bar: "attribute_key_value"}}
...> unit_register_fn.(%{"alt_reg" => AlternativeRegistry.new()}, "alt_reg", unit)
%{"alt_reg" => %{["attribute_key_value"] => ["unit_id"]}}
iex> unit_register_fn =
...> RoutingKeyBuilder.build_unit_register({:in, [[:foo]], [[:bar]]})
...> unit = %Unit{id: "unit_id", attributes: %{bar: ["value_1", "value_2"]}}
...> unit_register_fn.(%{"alt_reg" => AlternativeRegistry.new()}, "alt_reg", unit)
%{"alt_reg" => %{["value_1"] => ["unit_id"], ["value_2"] => ["unit_id"]}}
"""
@spec build_unit_register(UR.routing_key_def()) :: UR.unit_register()
def build_unit_register({:=, _, attribute_key_paths}) do
fn alt_regs, reg_name, %Unit{} = unit ->
key = Enum.map(attribute_key_paths, &MapUtils.get_path(unit.attributes, &1))
register_unit(alt_regs, reg_name, key, unit)
end
end
def build_unit_register({:in, _, [attribute_key_path]}) do
fn alt_regs, reg_name, %Unit{} = unit ->
unit_registration_keys =
for attribute_value <- MapUtils.get_path(unit.attributes, attribute_key_path, []) do
[attribute_value]
end
Enum.reduce(unit_registration_keys, alt_regs, fn key, alt_regs ->
register_unit(alt_regs, reg_name, key, unit)
end)
end
end
def build_unit_register({:in, _, [attribute_path1, attribute_path2]}) do
fn alt_regs, reg_name, %Unit{} = unit ->
unit_registration_keys =
for attribute_value_1 <- MapUtils.get_path(unit.attributes, attribute_path1, []),
attribute_value_2 <- MapUtils.get_path(unit.attributes, attribute_path2, []) do
[attribute_value_1, attribute_value_2]
end
Enum.reduce(unit_registration_keys, alt_regs, fn key, alt_regs ->
register_unit(alt_regs, reg_name, key, unit)
end)
end
end
defp register_unit(alt_regs, reg_name, key, unit) do
update_in(alt_regs[reg_name], &AlternativeRegistry.register_unit(&1, key, unit.id))
end
end