lib/kvasir_kafka/source.ex

defmodule Kvasir.Source.Kafka do
  @moduledoc """
  Documentation for Kvasir.Kafka.
  """
  @behaviour Kvasir.Source
  alias Kvasir.Kafka.OffsetTracker
  alias Kvasir.Offset
  import Kvasir.Client
  import Kvasir.Kafka, only: [decode: 5]
  import Kvasir.Publisher
  require Logger

  @impl Kvasir.Source
  def contains?(_name, _topic, _offset) do
    :maybe
    # case Offset.compare(OffsetTracker.offset(topic), offset) do
    #   :eq -> true
    #   :lt -> true
    #   :gt -> false
    #   :mixed -> :maybe
    # end
  end

  @impl Kvasir.Source
  def child_spec(name, opts \\ []) do
    %{
      id: name,
      start: {__MODULE__, :start_link, [name, opts]}
    }
  end

  def start_link(name, opts \\ []) do
    Kvasir.Kafka.Metrics.create()

    servers = prepare_servers(opts[:servers])
    connect_timeout = opts[:connect_timeout] || 120_000
    start_producers = Keyword.get(opts, :start_producers, true)
    track_offsets = Keyword.get(opts, :track_offsets, true)

    conn_config =
      [
        auto_start_producers: true,
        allow_topic_auto_creation: false,
        default_producer_config: [],
        reconnect_cool_down_seconds: 5
      ]
      |> Keyword.merge(opts)
      |> Keyword.drop(~w(servers initialize auto_create_topics auto_create_config)a)

    {:ok, _} = :application.ensure_all_started(:brod)
    :ok = :brod.start_client(servers, name, conn_config)

    if init = opts[:initialize] do
      if opts[:auto_create_topics] in [true, "TRUE", "1"],
        do: Kvasir.Kafka.create_topics(name, init, opts[:auto_create_config] || %{})

      if start_producers do
        init
        |> Map.keys()
        |> Enum.map(fn topic -> Task.async(fn -> :brod.start_producer(name, topic, []) end) end)
        |> Enum.each(&Task.await(&1, connect_timeout))
      end
    end

    children =
      if track_offsets do
        [
          OffsetTracker.child_spec(opts[:initialize], servers, conn_config)
        ]
      else
        []
      end

    Supervisor.start_link(children, strategy: :one_for_one, name: Module.concat(name, Supervisor))
  end

  defp prepare_servers(nil), do: raise("Need to set `:servers` option.")
  defp prepare_servers([]), do: raise("Need to set `:servers` option.")

  defp prepare_servers(servers) when is_list(servers) do
    Enum.map(servers, fn
      {k, v} ->
        {k, if(is_integer(v), do: v, else: String.to_integer(v))}

      v ->
        case String.split(v, ":", parts: 2, trim: true) do
          [s, p] -> {String.trim(s), p |> String.trim() |> String.to_integer()}
          [s] -> {String.trim(s), 9092}
        end
    end)
  end

  ### Writing ###

  # @impl Kvasir.Source
  # def publish(client, topic, event = %type{}) do
  #   key = Kvasir.Event.key(event)

  #   with {:ok, k} <- topic.key.dump(key, []),
  #        {:ok, p} <- topic.key.partition(key, topic.partitions),
  #        {:ok, data} <- topic.module.bin_encode(event) do
  #     t = topic.topic
  #     start = :erlang.monotonic_time()

  #     client
  #     |> do_publish(t, p, to_string(k), data)
  #     |> report_publish_metric(type, t, p, start)
  #   end
  # end

  @impl Kvasir.Source
  def commit(client, topic, event)

  def commit(client, topic, event = %type{__meta__: meta = %{key: key}}) do
    with {:ok, k} <- topic.key.dump(key, []),
         {:ok, p} <- topic.key.partition(key, topic.partitions),
         e = %{event | __meta__: %{meta | key: nil, topic: nil, partition: nil}},
         {:ok, data} <- topic.module.bin_encode(e) do
      t = topic.topic
      start = :erlang.monotonic_time()

      client
      |> do_commit(event, t, p, to_string(k), data)
      |> report_publish_metric(type, t, p, start)
    end
  end

  ### Reading ###

  @impl Kvasir.Source
  def subscribe(client, topic, callback_module, opts \\ []) do
    offset =
      if f = opts[:from] do
        f
      else
        Offset.create(Map.new(0..(topic.partitions - 1), &{&1, :earliest}))
      end

    begin = offset.partitions |> Map.values() |> Enum.reduce(&Offset.min/2)

    decoder = if(only = opts[:only], do: topic.module.filter(only), else: topic.module)

    consumer_config =
      opts
      |> Keyword.get(:consumer_config, [])
      |> Keyword.put_new(:begin_offset, begin)
      |> Keyword.put_new(:offset_reset_policy, :reset_to_earliest)

    {cb_module, message_type} =
      case opts[:mode] do
        :batch -> {Kvasir.Kafka.BatchSubscriber, :message_set}
        _ -> {Kvasir.Kafka.Subscriber, :message}
      end

    with {:ok, c, spec} <- client_child_spec(client) do
      children = [
        spec,
        %{
          id: :subscriber,
          type: :supervisor,
          start:
            {:brod_group_subscriber_v2, :start_link,
             [
               %{
                 client: c,
                 group_id: opts[:group],
                 group_config: Keyword.get(opts, :group_config, []),
                 consumer_config: consumer_config,
                 cb_module: cb_module,
                 topics: [topic.topic],
                 message_type: message_type,
                 init_data: {topic, offset, decoder, callback_module, opts[:state]}
               }
             ]}
        }
      ]

      Supervisor.start_link(children, strategy: :rest_for_one)
    end
  end

  @impl Kvasir.Source
  def listen(client, topic, callback, opts \\ []) do
    {partitions, resume_offsets} =
      if f = opts[:from] do
        p = f.partitions
        {Map.keys(p), Enum.map(p, fn {k, v} -> {k, v} end)}
      else
        {Enum.map(0..(topic.partitions - 1), & &1), []}
      end

    starter = self()

    decoder = if(only = opts[:only], do: topic.module.filter(only), else: topic.module)

    fn ->
      with {:ok, c} <- client_start_link(client),
           {:ok, _} <-
             :brod_topic_subscriber.start_link(
               c,
               topic.topic,
               partitions,
               # [begin_offset: :earliest]
               _consumerConfig = [begin_offset: :latest],
               resume_offsets,
               :message,
               &listen_call/3,
               {topic, callback, decoder}
             ) do
        send(starter, :subscriber_up)
        Process.sleep(:infinity)
      else
        err -> send(starter, {:subscriber_failed, err})
      end
    end
    |> spawn_link
    |> wait_for_subscribe()
  end

  defp wait_for_subscribe(controller) do
    receive do
      :subscriber_up ->
        if Process.link(controller) do
          {:ok, controller}
        else
          {:error, :subscribe_failed}
        end

      {:subscriber_failed, err} ->
        err
    after
      5_000 ->
        Process.exit(controller, :kill)
        {:error, :subscribe_timeout}
    end
  end

  defp listen_call(partition, message, {topic, callback, decoder}) do
    listened =
      with {:ok, e} <- Kvasir.Kafka.decode?(decoder, message, topic, partition),
           do: callback.(e)

    if listened == :ok do
      {:ok, :ack, {topic, callback, decoder}}
    else
      listened
    end
  end

  @impl Kvasir.Source
  def stream(client, topic, opts \\ []) do
    offset =
      cond do
        f = opts[:from] ->
          f

        k = opts[:key] || opts[:id] ->
          {:ok, p} = topic.key.partition(k, topic.partitions)
          Offset.create(p, OffsetTracker.offset(topic.topic, p))

        p = opts[:partition] ->
          Offset.create(p, OffsetTracker.offset(topic.topic, p))

        :all ->
          OffsetTracker.offset(topic.topic)
      end

    pre_filter =
      case opts[:key] do
        nil ->
          fn _ -> true end

        key ->
          {:ok, m} = topic.key.dump(key, [])
          m = to_string(m)
          fn {:kafka_message, _, k, _, _, _, _} -> k == m end
      end

    decoder = if(only = opts[:events], do: topic.module.filter(only), else: topic.module)

    read_timeout = Keyword.get(opts, :read_timeout, 60_000)

    if Enum.count(offset.partitions) == 1 do
      # Single Read
      {:ok,
       Stream.resource(
         fn -> offset end,
         fn
           f = %{partitions: p} when p == %{} ->
             {:halt, f}

           f = %{partitions: p} ->
             r =
               p
               |> Enum.map(fn {p, o} ->
                 {p, read(client, topic.topic, topic.key, decoder, pre_filter, p, o)}
               end)
               |> Enum.reduce({[], f}, &reducer/2)

             Logger.debug(fn ->
               "#{inspect(__MODULE__)}[#{inspect(client)}]: Read #{Enum.count(elem(r, 0))}"
             end)

             r
         end,
         fn _ -> :ok end
       )}
    else
      # Multiread
      {:ok,
       Stream.resource(
         fn -> offset end,
         fn
           f = %{partitions: p} when p == %{} ->
             {:halt, f}

           f = %{partitions: p} ->
             r =
               p
               |> Enum.map(fn {p, o} ->
                 {p,
                  Task.async(fn ->
                    read(client, topic.topic, topic.key, decoder, pre_filter, p, o)
                  end)}
               end)
               |> Enum.map(fn {p, task} -> {p, Task.await(task, read_timeout)} end)
               |> Enum.reduce({[], f}, &reducer/2)

             Logger.debug(fn ->
               "#{inspect(__MODULE__)}[#{inspect(client)}]: Read #{Enum.count(elem(r, 0))}"
             end)

             r
         end,
         fn _ -> :ok end
       )}
    end
  end

  defp reducer({p, {total, off, values}}, {acc, a}) do
    next = off + 1

    if total > next do
      {acc ++ values, Offset.set(a, p, next)}
    else
      {acc ++ values, %{a | partitions: Map.delete(a.partitions, p)}}
    end
  end

  defp read(client, topic, key, decoder, filter, partition, offset, crash \\ 0)

  defp read(client, topic, key, decoder, filter, partition, offset, crash) do
    case :brod.fetch(
           client,
           topic,
           partition,
           offset,
           %{max_bytes: 1024 * 1024 * 1024, max_wait_time: 1}
         ) do
      {:ok, {total, m}} ->
        continue = if e = List.last(m), do: Kvasir.Kafka.offset(e), else: offset

        {total, continue,
         m
         |> Enum.filter(filter)
         |> Enum.map(&decode(&1, topic, key, decoder, partition))
         |> read_reduce([])}

      {:error, :offset_out_of_range} ->
        t = OffsetTracker.offset(topic, partition)

        if offset >= 0 and offset <= t,
          do: read(client, topic, key, decoder, filter, partition, t),
          else: {offset, offset - 1, []}
    end
  rescue
    e ->
      if crash <= 10 do
        :timer.sleep(crash * 10)
        read(client, topic, key, decoder, filter, partition, offset, crash + 1)
      else
        reraise e, __STACKTRACE__
      end
  catch
    e, _ ->
      if crash <= 10 do
        :timer.sleep(crash * 10)
        read(client, topic, key, decoder, filter, partition, offset, crash + 1)
      else
        throw(e)
      end
  end

  defp read_reduce([], acc), do: :lists.reverse(acc)

  defp read_reduce([event | events], acc) do
    case event do
      {:ok, e} -> read_reduce(events, [e | acc])
      {:error, :unknown_event_type} -> read_reduce(events, acc)
      err = {:error, _} -> raise inspect(err)
    end
  end

  @impl Kvasir.Source
  def generate_dedicated_publisher(name, target, topic, opts)

  def generate_dedicated_publisher(
        name,
        target,
        %{key: key, module: module, partitions: partitions, topic: topic},
        opts
      ) do
    pool_size = Keyword.get(opts, :pool_size, 1)

    {client, extra} =
      if pool_size > 1 do
        diff = -(:erlang.unique_integer([:positive]) - :erlang.unique_integer([:positive]))
        clients = Enum.map(1..pool_size, &Module.concat([target, "C#{&1}"]))

        get =
          Enum.reduce(
            Enum.with_index(clients),
            quote do
              @spec client(non_neg_integer) :: module
              defp client(id)
            end,
            fn {w, i}, acc ->
              quote do
                unquote(acc)
                defp client(unquote(i)), do: unquote(w)
              end
            end
          )

        {quote do
           [:positive]
           |> :erlang.unique_integer()
           |> :erlang.div(unquote(diff))
           |> rem(unquote(Enum.count(clients)))
           |> client()
         end,
         quote do
           @doc false
           def start_link(opts \\ [])

           def start_link(_opts) do
             children =
               Enum.map(unquote(clients), fn c ->
                 {:ok, ^c, spec} = client_child_spec(unquote(name), name: c)
                 Map.put(spec, :id, c)
               end)

             Supervisor.start_link(children, strategy: :one_for_one, name: __MODULE__)
           end

           unquote(get)
         end}
      else
        {quote(do: __MODULE__),
         quote do
           @doc false
           def start_link(opts \\ [])

           def start_link(_opts),
             do: client_start_link(unquote(name), name: __MODULE__, named: false)
         end}
      end

    Code.compiler_options(ignore_module_conflict: true)

    compiled =
      Code.compile_quoted(
        quote do
          defmodule unquote(target) do
            @moduledoc false
            import Kvasir.Client
            import Kvasir.Publisher

            @doc false
            def child_spec(opts \\ [])

            def child_spec(opts) do
              %{
                id: __MODULE__,
                type: :supervisor,
                start: {__MODULE__, :start_link, [opts]}
              }
            end

            unquote(extra)

            @doc false
            @spec publish(Kvasir.Event.t()) :: {:ok, Kvasir.Event.t()} | {:error, term}
            def publish(event)

            def publish(event = %type{__meta__: meta = %{key: key}}) do
              with {:ok, k} <- unquote(key).dump(key, []),
                   {:ok, p} <- unquote(key).partition(key, unquote(partitions)),
                   e = %{event | __meta__: %{meta | key: nil, topic: nil, partition: nil}},
                   {:ok, data} <- unquote(module).bin_encode(e) do
                start = :erlang.monotonic_time()

                unquote(client)
                |> do_commit(event, unquote(topic), p, to_string(k), data)
                |> report_publish_metric(type, unquote(topic), p, start)
              end
            end
          end
        end
      )

    Code.compiler_options(ignore_module_conflict: false)

    if match?([{_, _}], compiled), do: :ok, else: {:error, :failed_to_compile_dedicated_publisher}
  end
end