lib/batcher.ex

defmodule LogflareEx.Batcher do
  @moduledoc """
  Batching cache is an Etso repo, `LogflareEx.Repo`, and stores all events to be sent to the Logflare service.

  There are 2 states that an event can be in:
  - pending
  - inflight

  If an event is inflight, it will have an `:inflight_at` timestamp stored on the struct.
  """
  use GenServer

  import Ecto.Query
  alias LogflareEx.BatchedEvent
  alias LogflareEx.BatcherRegistry
  alias LogflareEx.Client
  alias LogflareEx.Repo

  # API

  @doc """
  Creates an event in the batching cache. This event will be considered as pending if it does not have an `:inflight_at` value set.

  An event should only be created after all payload manipulations has been performed. The payload will be stored on the `:body` key.

  All timestamp fields internally on the struct are NaiveDateTime.

  Required fields:
  - :body
  - :source_token or :source_name
  """
  @spec create_event(map()) :: {:ok, BatchedEvent.t()}
  def create_event(attrs) do
    %BatchedEvent{created_at: NaiveDateTime.utc_now()}
    |> BatchedEvent.changeset(attrs)
    |> Repo.insert()
  end

  @doc """
  Lists all events within the cache. All arguments provided are considered additive filters.

  ### Example
  ```elixir
  list_events_by(:pending)
  list_events_by(:all)
  list_events_by(:inflight)
  list_events_by(:all, source_token: "...")
  list_events_by(:all, source_name: "...")
  list_events_by(:all, source_name: "...", limit: 5)
  ```

  ### Limitations

  Etso does not support the Ecto.Query `:limit` option, hence filtering is done post result fetch.
  """
  @typep list_opts :: [
           {:source_name, String.t()}
           | {:source_token, String.t()}
           | {:limit, non_neg_integer()}
         ]
  @typep status_filter :: :all | :pending | :inflight
  @spec list_events_by(status_filter(), list_opts()) :: [BatchedEvent.t()]
  def list_events_by(type, opts \\ []) when type in [:all, :pending, :inflight] do
    opts =
      Enum.into(opts, %{
        source_name: nil,
        source_token: nil,
        limit: nil
      })

    from(e in BatchedEvent)
    |> then(fn
      q when type == :pending -> where(q, [e], is_nil(e.inflight_at))
      q when type == :inflight -> where(q, [e], not is_nil(e.inflight_at))
      q -> q
    end)
    |> then(fn
      q when opts.source_token != nil -> where(q, [e], e.source_token == ^opts.source_token)
      q when opts.source_name != nil -> where(q, [e], e.source_name == ^opts.source_name)
      q -> q
    end)
    |> Repo.all()
    |> then(fn
      data when opts.limit != nil ->
        Enum.take(data, opts.limit)

      data ->
        data
    end)
  end

  @doc """
  Updates the event within the batching cache.
  """
  @spec update_event(BatchedEvent.t(), map()) :: {:ok, BatchedEvent.t()}
  def update_event(event, attrs) do
    event
    |> BatchedEvent.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Deletes all events in the cache, regardless of the status.
  """
  @spec delete_all_events() :: :ok
  def delete_all_events do
    Repo.delete_all(BatchedEvent)
    :ok
  end

  @doc """
  Performs a flush for the given source.

  Accepts the following filters: `:source_name` or `:source_token`

  Flush is performed asyncronously.
  """
  @typep kw_filter :: [{:source_name, String.t()} | {:source_token, String.t()}]
  @spec flush(kw_filter()) :: :ok
  def flush(%Client{source_name: source_name}), do: flush(source_name: source_name)
  def flush(%Client{source_token: source_token}), do: flush(source_token: source_token)

  def flush(kw) do
    kw
    |> via()
    |> GenServer.cast(:flush)
  end

  @doc """
  Deletes a single event in the cache.

  ### Example

  ```elixir
  iex> delete_event(event)
  {:ok, %BatchedEvent{...}}
  ```
  """
  @spec delete_event(BatchedEvent.t()) :: {:ok, BatchedEvent.t()}
  def delete_event(%BatchedEvent{} = event) do
    Repo.delete(event)
  end

  @doc """
  Returns the via for each partitioned Batcher. Accepts a `source_token` or `source_name` filter or a `%LogflareEx.Client{}` struct.

  ### Example

  ```elixir
  via(source_name: "my source")
  via(source_token: "some-uuid")
  via(%LogflareEx.Client{...})
  ```
  """
  @spec via(Client.t() | kw_filter()) :: identifier()
  def via(%Client{source_token: "" <> token}), do: via(source_token: token)
  def via(%Client{source_name: "" <> name}), do: via(source_name: name)
  def via(source_name: name), do: {:via, Registry, {BatcherRegistry, {:source_name, name}}}
  def via(source_token: token), do: {:via, Registry, {BatcherRegistry, {:source_token, token}}}

  # GenServer

  def start_link(opts) when is_list(opts) do
    opts
    |> Client.new()
    |> start_link()
  end

  def start_link(%Client{} = client) do
    GenServer.start_link(__MODULE__, client, name: via(client))
  end

  @impl GenServer
  def init(%Client{source_name: name, source_token: token} = client) do
    partition_key =
      cond do
        token != nil -> {:source_token, token}
        name != nil -> {:source_name, name}
        true -> nil
      end

    state = %{
      client: client,
      key: partition_key
    }

    schedule_flush(state)
    {:ok, state}
  end

  @impl GenServer
  def handle_cast(:flush, state) do
    flush_events(state)
    {:noreply, state}
  end

  # Flushes the cache of all items matching the Batcher's key.
  @impl GenServer
  def handle_info(:flush, state) do
    flush_events(state)
    schedule_flush(state)
    {:noreply, state}
  end

  defp flush_events(state) do
    events =
      case state.key do
        {:source_name, name} ->
          list_events_by(:pending, source_name: name, limit: state.client.batch_size)

        {:source_token, token} ->
          list_events_by(:pending, source_token: token, limit: state.client.batch_size)
      end

    event_ids = for e <- events, do: e.id

    batch =
      for event <- events do
        {:ok, e} = update_event(event, %{inflight_at: NaiveDateTime.utc_now()})
        e.body
      end

    # Task to send batch
    Task.start_link(fn ->
      LogflareEx.send_events(state.client, batch)
      Repo.delete_all(from(e in BatchedEvent, where: e.id in ^event_ids))
    end)

    :ok
  end

  defp schedule_flush(%{client: %{auto_flush: false}} = state), do: state

  defp schedule_flush(state) do
    Process.send_after(self(), :flush, state.client.flush_interval)
    state
  end
end