lib/glific/metrics/worker.ex

defmodule Glific.Metrics.Worker do
  @moduledoc """
  Simple worker which caches all the counts for a specific flow and writes them
  out in batches. This allows us to amortize multiple requests into one DB write.

  Module influenced and borrowed from: https://dashbit.co/blog/homemade-analytics-with-ecto-and-elixir
  """
  use GenServer, restart: :temporary

  @registry Glific.Metrics.Registry

  alias Glific.{Flows.FlowCount, Repo}

  @doc false
  def start_link(key) do
    GenServer.start_link(__MODULE__, key, name: {:via, Registry, {@registry, key}})
  end

  @doc false
  @impl true
  def init({:flow_id, flow_id} = _key) do
    Process.flag(:trap_exit, true)
    schedule_upsert()
    {:ok, %{flow_id: flow_id, entries: %{}}}
  end

  defp update_entry(entry, count, messages) do
    messages =
      if Map.has_key?(entry, :recent_message) and entry.recent_message != %{},
        do: [entry.recent_message | messages],
        else: messages

    entry
    |> Map.put(:count, count + 1)
    |> Map.put(:recent_messages, messages)
  end

  @spec add_entry(map(), map()) :: map()
  defp add_entry(entry, entries) do
    e = entries[entry.uuid]

    {count, messages} =
      if e != nil,
        do: {e.count, e.recent_messages},
        else: {0, []}

    entries
    |> Map.put(entry.uuid, update_entry(entry, count, messages))
    |> Map.put_new(:organization_id, entry.organization_id)
  end

  @doc false
  defp schedule_upsert do
    # store to database between 2 to 4 minutes
    Process.send_after(self(), :upsert, Enum.random(120..240) * 1_000)
  end

  @doc false
  @impl true
  def handle_info({:bump, entry}, %{flow_id: flow_id, entries: entries}) do
    {:noreply, %{flow_id: flow_id, entries: add_entry(entry, entries)}}
  end

  @impl true
  def handle_info(:upsert, %{flow_id: flow_id} = state) do
    # We first unregister ourselves so we stop receiving new messages.
    Registry.unregister(@registry, {:flow_id, flow_id})

    # Schedule to stop in 2 seconds, this will give us time to process
    # any late messages.
    Process.send_after(self(), :stop, 2_000)
    {:noreply, state}
  end

  @impl true
  def handle_info(:stop, state) do
    # Now we just stop. The terminate callback will write all pending writes.
    {:stop, :shutdown, state}
  end

  @spec upsert!(map()) :: :ok
  defp upsert!(%{entries: entries}) do
    Repo.put_process_state(entries.organization_id)

    # we are only interested in the value of the map, which has map to be inserted
    entries
    |> Map.delete(:organization_id)
    |> Enum.each(&FlowCount.upsert_flow_count(elem(&1, 1)))
  end

  @doc false
  @impl true
  def terminate(_, state), do: upsert!(state)
end