lib/ex_buffer.ex

defmodule ExBuffer do
  @moduledoc """
  An ExBuffer is a process that maintains a collection of items and flushes
  them once certain conditions have been met.

  ExBuffers can flush based on a timeout, a maximum length (item count), a
  maximum byte size, or a combination of the three. When multiple conditions are
  used, the ExBuffer will flush when the **first** condition is met.

  ExBuffers also come with a number of helpful tools for testing and debugging.
  """

  use GenServer

  alias ExBuffer.State

  @type t :: GenServer.name() | pid()

  @ex_buffer_fields [:callback, :buffer_timeout, :max_length, :max_size]

  ################################
  # Public API
  ################################

  @doc """
  Starts an `ExBuffer` process linked to the current process.

  ## Options

  An ExBuffer can be started with the following options:

    * `:callback` - The function that will be invoked to handle a flush. This
      function should expect a single parameter: a list of items. (Required
      `function()`)

    * `:buffer_timeout` - The maximum time (in ms) allowed between flushes of
      the ExBuffer. Once this amount of time has passed, the ExBuffer will be
      flushed. (Optional `non_neg_integer()`, Default = `:infinity`)

    * `:max_length` - The maximum allowed length (item count) of the ExBuffer.
      Once the limit is hit, the ExBuffer will be flushed. (Optional
      `non_neg_integer()`, Default = `:infinity`)

    * `:max_size` - The maximum allowed size (in bytes) of the ExBuffer. Once
      the limit is hit (or exceeded), the ExBuffer will be flushed. For more
      information on how size is computed, see `ExBuffer.size/1`. (Optional
      `non_neg_integer()`, Default = `:infinity`)

  Additionally, an ExBuffer can also be started with any `GenServer` options.
  """
  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(opts \\ []) do
    {opts, server_opts} = Keyword.split(opts, @ex_buffer_fields)
    GenServer.start_link(__MODULE__, opts, server_opts)
  end

  @doc """
  Lazily chunks an enumerable based on one or more ExBuffer flush conditions.

  This function currently supports length and size conditions. If multiple
  conditions are specified, a chunk is emitted once the **first** condition is
  met (just like an ExBuffer process).

  While this function is useful in it's own right, it's included primarily as
  another way to synchronously test applications that use ExBuffer.

  ## Options

  An enumerable can be chunked with the following options:

    * `:max_length` - The maximum allowed length (item count) of a chunk.
      (Optional `non_neg_integer()`, Default = `:infinity`)

    * `:max_size` - The maximum allowed size (in bytes) of a chunk. (Optional
      `non_neg_integer()`, Default = `:infinity`)

  > #### Warning {: .warning}
  >
  > Not including either of the options above is permitted but will result in a
  > single chunk being emitted. One can achieve a similar result in a more performant
  > way using `Stream.into/2`. In that same vein, including only a `:max_length`
  > condition makes this function a less performant version of `Stream.chunk_every/2`.
  > This function is optimized for chunking by either size or size **and** count. Any other
  > chunking strategy can likely be achieved in a more efficient way using other methods.

  ## Example

      enum = ["foo", "bar", "baz", "foobar", "barbaz", "foobarbaz"]

      enum
      |> ExBuffer.chunk_enum!(max_length: 3, max_size: 10)
      |> Enum.into([])
      #=> [["foo", "bar", "baz"], ["foobar", "barbaz"], ["foobarbaz"]]
  """
  @spec chunk_enum!(Enumerable.t(), keyword()) :: Enumerable.t()
  def chunk_enum!(enum, opts \\ []) do
    opts
    |> Keyword.take([:max_length, :max_size])
    |> init_state(false)
    |> case do
      {:ok, state} -> Stream.chunk_while(enum, state, &do_insert(&2, &1), &chunk_end/1)
      {:error, reason} -> raise(ArgumentError, to_message(reason))
    end
  end

  @doc """
  Dumps the contents of the given `ExBuffer` to a list, bypassing a flush
  callback and resetting the buffer.

  While this behavior may occasionally be desriable in a production environment,
  it is intended to be used primarily for testing and debugging.

  ## Example

      ExBuffer.insert(:test_buffer, "foo")
      ExBuffer.insert(:test_buffer, "bar")

      ExBuffer.dump(:test_buffer)
      #=> ["foo", "bar"]

      ExBuffer.length(:test_buffer)
      #=> 0
  """
  @spec dump(t()) :: list()
  def dump(buffer), do: GenServer.call(buffer, :dump)

  @doc """
  Flushes the given `ExBuffer`, regardless of whether or not the flush conditions
  have been met.

  While this behavior may occasionally be desriable in a production environment,
  it is intended to be used primarily for testing and debugging.

  ## Options

  An ExBuffer can be manually flushed with the following options:

    * `:async` - Determines whether or not the flush will be asynchronous. (Optional
      `boolean()`, Default = `true`)

  ## Example

      ExBuffer.insert(:test_buffer, "foo")
      ExBuffer.insert(:test_buffer, "bar")

      # Assuming the flush callback is `IO.inspect/1`
      ExBuffer.flush(:test_buffer)
      #=> outputs ["foo", "bar"]

      ExBuffer.length(:test_buffer)
      #=> 0
  """
  @spec flush(t(), keyword()) :: :ok
  def flush(buffer, opts \\ []) do
    if Keyword.get(opts, :async, true) do
      GenServer.call(buffer, :async_flush)
    else
      GenServer.call(buffer, :sync_flush)
    end
  end

  @doc """
  Inserts the given item into the given `ExBuffer`.

  ## Example

      ExBuffer.insert(:test_buffer, "foo")
      #=> :test_buffer items = ["foo"]

      ExBuffer.insert(:test_buffer, "bar")
      #=> :test_buffer items = ["foo", "bar"]
  """
  @spec insert(t(), term()) :: :ok
  def insert(buffer, item), do: GenServer.call(buffer, {:insert, item})

  @doc """
  Returns the length (item count) of the given `ExBuffer`.

  While this behavior may occasionally be desriable in a production environment,
  it is intended to be used primarily for testing and debugging.

  ## Example

      ExBuffer.insert(:test_buffer, "foo")
      ExBuffer.insert(:test_buffer, "bar")

      ExBuffer.length(:test_buffer)
      #=> 2
  """
  @spec length(t()) :: non_neg_integer()
  def length(buffer), do: GenServer.call(buffer, :length)

  @doc """
  Retuns the size (in bytes) of the given `ExBuffer`.

  Item size is computed using `Kernel.byte_size/1`. Because this function requires
  a bitstring input, non-bitstring items are first transformed into binary using
  `:erlang.term_to_binary/1`.

  While this behavior may occasionally be desriable in a production environment,
  it is intended to be used primarily for testing and debugging.

  ## Example

      ExBuffer.insert(:test_buffer, "foo")
      ExBuffer.insert(:test_buffer, "bar")

      ExBuffer.size(:test_buffer)
      #=> 6
  """
  @spec size(t()) :: non_neg_integer()
  def size(buffer), do: GenServer.call(buffer, :size)

  ################################
  # GenServer Callbacks
  ################################

  @doc false
  @impl GenServer
  @spec init(keyword()) :: {:ok, State.t()} | {:stop, :invalid_callback | :invalid_limit}
  def init(opts) do
    case init_state(opts) do
      {:ok, state} -> {:ok, refresh_state(state)}
      {:error, reason} -> {:stop, reason}
    end
  end

  @doc false
  @impl GenServer
  @spec handle_call(term(), GenServer.from(), State.t()) ::
          {:reply, term(), State.t()} | {:reply, term(), State.t(), {:continue, {:flush, list()}}}
  def handle_call(:dump, _from, state) do
    {:reply, State.items(state), refresh_state(state)}
  end

  def handle_call(:async_flush, _from, state) do
    {state, items} = flush_state(state)
    {:reply, :ok, state, {:continue, {:flush, items}}}
  end

  def handle_call(:sync_flush, _from, state) do
    {:reply, :ok, do_sync_flush(state)}
  end

  def handle_call({:insert, item}, _from, state) do
    case do_insert(state, item) do
      {:cont, items, state} -> {:reply, :ok, state, {:continue, {:flush, items}}}
      {:cont, state} -> {:reply, :ok, state}
    end
  end

  def handle_call(:length, _from, state), do: {:reply, State.length(state), state}
  def handle_call(:size, _from, state), do: {:reply, State.size(state), state}

  @doc false
  @impl GenServer
  @spec handle_continue(term(), State.t()) :: {:noreply, State.t()}
  def handle_continue({:flush, items}, state) do
    state.callback.(items)
    {:noreply, state}
  end

  @doc false
  @impl GenServer
  @spec handle_info(term(), State.t()) ::
          {:noreply, State.t()} | {:noreply, State.t(), {:continue, {:flush, list()}}}
  def handle_info({:timeout, timer, :flush}, state) when timer == state.timer do
    {state, items} = flush_state(state)
    {:noreply, state, {:continue, {:flush, items}}}
  end

  def handle_info(_, state), do: {:noreply, state}

  @doc false
  @impl GenServer
  @spec terminate(term(), State.t()) :: :ok | State.t()
  def terminate(reason, _) when reason in [:invalid_callback, :invalid_limit], do: :ok
  def terminate(_, state), do: do_sync_flush(state)

  ################################
  # Private API
  ################################

  defp init_state(opts, process \\ true) do
    callback = Keyword.get(opts, :callback)
    max_length = Keyword.get(opts, :max_length, :infinity)
    max_size = Keyword.get(opts, :max_size, :infinity)
    timeout = Keyword.get(opts, :buffer_timeout, :infinity)
    State.new(callback, max_length, max_size, timeout, process)
  end

  defp do_insert(state, item) do
    state = State.insert(state, item)

    if State.flush?(state) do
      {state, items} = flush_state(state)
      {:cont, items, state}
    else
      {:cont, state}
    end
  end

  defp do_sync_flush(state) do
    {state, items} = flush_state(state)
    state.callback.(items)
    state
  end

  defp flush_state(state), do: {refresh_state(state), State.items(state)}

  defp refresh_state(%State{timeout: :infinity} = state), do: State.refresh(state)

  defp refresh_state(state) do
    cancel_upcoming_flush(state)
    timer = schedule_next_flush(state)
    State.refresh(state, timer)
  end

  defp cancel_upcoming_flush(%State{timer: nil}), do: :ok
  defp cancel_upcoming_flush(state), do: Process.cancel_timer(state.timer)

  defp schedule_next_flush(state) do
    # We use `:erlang.start_timer/3` to include the timer ref in the message
    # This is necessary for handling occasional race conditions
    :erlang.start_timer(state.timeout, self(), :flush)
  end

  defp chunk_end(%State{buffer: []} = state), do: {:cont, state}
  defp chunk_end(state), do: {:cont, State.items(state), refresh_state(state)}

  defp to_message(reason), do: String.replace(to_string(reason), "_", " ")
end