lib/cachex/services/courier.ex

defmodule Cachex.Services.Courier do
  @moduledoc """
  Dispatch service to retrieve values from remote calls.

  The Courier provides the main implementation for fallbacks triggered
  by calls to the `fetch()` command. It acts as a synchronized execution
  for tasks to avoid duplicating calls when loading.

  The Courier uses a very simple algorithm to determine when to execute
  a fallback, so there's very little overhead to synchronizing calls
  through it. As tasks are dispatched via spawned processes, there's
  very little action actually happening in the service process itself.
  """
  use GenServer

  # import spec macros
  import Cachex.Spec

  # add some aliases
  alias Cachex.Actions
  alias Cachex.Actions.Put
  alias Cachex.ExecutionError

  ##############
  # Public API #
  ##############

  @doc """
  Starts a new Courier process for a cache.
  """
  @spec start_link(Spec.cache()) :: GenServer.on_start()
  def start_link(cache(name: name) = cache),
    do: GenServer.start_link(__MODULE__, cache, name: name(name, :courier))

  @doc """
  Dispatches the Courier to execute a task.

  The task provided must be a closure with arity 0, in order to
  simplify the interfaces internally. This is a blocking remote
  call which will wait until a result can be loaded.
  """
  @spec dispatch(Spec.cache(), any, (() -> any)) :: any
  def dispatch(cache() = cache, key, task) when is_function(task, 0),
    do: service_call(cache, :courier, {:dispatch, key, task, local_stack()})

  ####################
  # Server Callbacks #
  ####################

  @doc false
  # Initializes a Courier service using a cache record.
  #
  # This will create a Tuple to store the cache record as well
  # as the Map used to track the internal task referencing.
  def init(cache),
    do: {:ok, {cache, %{}}}

  @doc false
  # Dispatches a tasks to be carried out by the Courier.
  #
  # Tasks will only be executed if they're not already in progress. This
  # is only tracked on a key level, so it's not possible to track different
  # tasks for a given key.
  #
  # Due to the nature of the async behaviour, this call will return before
  # the task has been completed, and the :notify callback will receive the
  # results from the task after completion (regardless of outcome).
  def handle_call({:dispatch, key, task, stack}, caller, {cache, tasks}) do
    references =
      case Map.get(tasks, key, []) do
        [] ->
          parent = self()

          spawn(fn ->
            result =
              try do
                task.()
              rescue
                e ->
                  {
                    :error,
                    %ExecutionError{
                      message: Exception.message(e),
                      stack: stack_compat() ++ stack
                    }
                  }
              end

            formatted = Actions.format_fetch_value(result)
            normalized = Actions.normalize_commit(formatted)

            with {:commit, val, options} <- normalized do
              Put.execute(cache, key, val, [const(:notify_false) | options])
            end

            send(parent, {:notify, key, formatted})
          end)

          [caller]

        li ->
          [caller | li]
      end

    {:noreply, {cache, Map.put(tasks, key, references)}}
  end

  @doc false
  # Receives a notification of a previously completed task.
  #
  # This will update all processes waiting for the result of the
  # specified task, and remove the task from the tracked state.
  #
  # Any processes waiting for the result will be given an `:ok`
  # tag rather than a `:commit` tag, to make it possible to know
  # which call to `fetch/4` actually loaded the backing value.
  def handle_info({:notify, key, result}, {cache, tasks}) do
    callers =
      tasks
      |> Map.get(key, [])
      |> Enum.reverse()

    with [owner | listeners] <- callers do
      GenServer.reply(owner, result)

      result =
        case result do
          {:commit, value, _} ->
            {:ok, value}

          {:commit, value} ->
            {:ok, value}

          value ->
            value
        end

      for caller <- listeners do
        GenServer.reply(caller, result)
      end
    end

    {:noreply, {cache, Map.delete(tasks, key)}}
  end

  ###############
  # Private API #
  ###############

  # Generates a stack trace prior to dispatch.
  defp local_stack do
    self()
    |> Process.info(:current_stacktrace)
    |> elem(1)
    |> tl
    |> tl
  end
end