lib/pillar/bulk_insert_buffer.ex

defmodule Pillar.BulkInsertBuffer do
  @moduledoc """
  This module provides functionality for bulk inserts and buffering records

  ```elixir
  defmodule BulkToLogs do
    use Pillar.BulkInsertBuffer,
      pool: ClickhouseMaster,
      table_name: "logs",
      interval_between_inserts_in_seconds: 5,
      on_errors: &__MODULE__.dump_to_file/2

    def dump_to_file(_result, records) do
      File.write("bad_inserts/#{DateTime.utc_now()}", inspect(records))
    end
  end
  ```

  ```elixir
  :ok = BulkToLogs.insert(%{value: "online", count: 133, datetime: DateTime.utc_now()})
  :ok = BulkToLogs.insert(%{value: "online", count: 134, datetime: DateTime.utc_now()})
  :ok = BulkToLogs.insert(%{value: "online", count: 132, datetime: DateTime.utc_now()})
  ....

  # all this records will be inserted with 5 second interval
  ```
  """

  defmacro __using__(options) do
    quote do
      use GenServer
      import Supervisor.Spec

      def start_link(_any \\ nil) do
        name = __MODULE__
        pool = Keyword.get(unquote(options), :pool)
        table_name = Keyword.get(unquote(options), :table_name)

        if is_nil(pool) do
          raise "#{__MODULE__} pool is not set"
        end

        if is_nil(table_name) do
          raise "#{__MODULE__} table_name is not set"
        end

        errors_handle_function =
          Keyword.get(unquote(options), :on_errors, fn _any, _records -> :ok end)

        records = []

        GenServer.start_link(__MODULE__, {pool, table_name, records, errors_handle_function},
          name: name
        )
      end

      def init(state) do
        schedule_work()
        {:ok, state}
      end

      def insert(data) when is_map(data) do
        GenServer.cast(__MODULE__, {:insert, data})
      end

      def force_bulk_insert do
        GenServer.call(__MODULE__, :do_insert, 60_000)
      end

      def records_for_bulk_insert() do
        GenServer.call(__MODULE__, :records_for_bulk_insert, 60_000)
      end

      def handle_call(:do_insert, _from, state) do
        new_state = do_bulk_insert(state)

        {:reply, :ok, new_state}
      end

      def handle_cast({:insert, nil = _data}, state) do
        {:noreply, state}
      end

      def handle_cast(
            {:insert, data},
            {pool, table_name, records, errors_handle_function} = _state
          ) do
        {:noreply, {pool, table_name, [data | records], errors_handle_function}}
      end

      def handle_call(
            :records_for_bulk_insert,
            _from,
            {_pool, _table_name, records, _errors_handle_function} = state
          ) do
        {:reply, records, state}
      end

      def handle_info(:cron_like_records, state) do
        new_state = do_bulk_insert(state)
        schedule_work()
        {:noreply, new_state}
      end

      defp schedule_work do
        # 5 seconds by default
        seconds = Keyword.get(unquote(options), :interval_between_inserts_in_seconds, 5)
        Process.send_after(self(), :cron_like_records, :timer.seconds(seconds))
      end

      defp do_bulk_insert({_pool, _table_name, [], _error_handle_function} = state) do
        state
      end

      defp do_bulk_insert({pool, table_name, records, error_handle_function} = state) do
        result = pool.insert_to_table(table_name, records)

        case result do
          {:error, _reason} -> error_handle_function.(result, records)
          _another -> nil
        end

        # Build state back, without records
        {
          pool,
          table_name,
          [],
          error_handle_function
        }
      end
    end
  end
end