defmodule Kvasir.Kafka do
@moduledoc """
Documentation for Kvasir.Kafka.
"""
def offset({:kafka_message, offset, _key, _payload, _, _timestamp, _meta}), do: offset
def decode(
{:kafka_message_set, topic, partition, _, messages},
_topic,
key,
decoder,
_partition
) do
Enum.map(messages, &decode(&1, topic, key, decoder, partition))
end
def decode(
{:kafka_message, offset, k, payload, _, timestamp, _meta},
topic,
key,
decoder,
partition
) do
with {:ok, event} <- decoder.bin_decode(payload),
{:ok, k} <- key.parse(k, []) do
{:ok,
%{
event
| __meta__: %Kvasir.Event.Meta{
key: k,
key_type: key,
sub_key: event.__meta__.sub_key,
topic: topic,
partition: partition,
offset: offset,
timestamp: UTCDateTime.from_unix!(timestamp, :millisecond)
}
}}
end
end
def decode?(
decoder,
{:kafka_message, offset, k, payload, _, timestamp, _meta},
_topic = %{key: key, topic: topic},
partition
) do
with {:ok, event} <- decoder.bin_decode(payload),
{:ok, k} <- key.parse(k, []) do
{:ok,
%{
event
| __meta__: %Kvasir.Event.Meta{
key: k,
key_type: key,
sub_key: event.__meta__.sub_key,
topic: topic,
partition: partition,
offset: offset,
timestamp: UTCDateTime.from_unix!(timestamp, :millisecond)
}
}}
else
{:error, :unknown_event_type} -> :ok
err -> err
end
end
@base_topic_config %{
"cleanup.policy" => "delete",
"max.message.bytes" => "20485760",
"retention.ms" => "2419200000",
"delete.retention.ms" => "86400000"
}
def create_topics(client, topics, create_config, timeout \\ 5_000) do
{:state, _, hosts, _, _, _, _, config, _} = client |> Process.whereis() |> :sys.get_state()
{:ok, conn} = :kpro.connect_controller(hosts, config)
config_entries =
@base_topic_config
|> Map.merge(create_config)
|> Enum.map(fn {k, v} -> [name: k, value: v] end)
topic_settings =
Enum.map(topics, fn {topic, partitions} ->
[
name: topic,
num_partitions: partitions,
replication_factor: Map.get(create_config, "replication_factor", 1),
assignments: [],
configs: config_entries
]
end)
req = :kpro_req_lib.create_topics(0, topic_settings, %{timeout: timeout})
with {:ok, {:kpro_rsp, _, :create_topics, _, %{topics: errors}}} <-
:kpro.request_sync(conn, req, timeout) do
if err = Enum.find(errors, &(&1.error_code != :no_error)) do
{:error, err.error_code}
else
# partition_settings =
# Enum.map(topics, fn {topic, partitions} ->
# [topic: topic, new_partitions: [count: partitions, assignment: [[_BrokerId = 0]]]]
# end)
#
# attempt_partitions(client, partition_settings, timeout)
:ok
end
end
end
# defp attempt_partitions(client, config, timeout, attempt \\ 0)
# defp attempt_partitions(client, config, timeout, 5), do: raise("Failed to create partitions.")
# defp attempt_partitions(client, config, timeout, attempt) do
# {:state, _, hosts, _, _, _, _, c, _} = client |> Process.whereis() |> :sys.get_state()
# {:ok, conn} = :kpro.connect_controller(hosts, c)
# EnumX.each(config, fn p ->
# req = :kpro_req_lib.create_partitions(0, [p], %{timeout: timeout})
# with {:ok, {:kpro_rsp, _, _, _, %{topic_errors: errors}}} <-
# IO.inspect(:kpro.request_sync(conn, req, timeout)) do
# if err = Enum.find(errors, &(&1.error_code != :no_error)) do
# {:error, err.error_code}
# else
# :ok
# end
# end
# end)
# |> case do
# :ok ->
# :ok
# _ ->
# :timer.sleep(timeout)
# attempt_partitions(client, config, timeout, attempt + 1)
# end
# end
end