lib/external_service.ex

defmodule ExternalService do
  @moduledoc """
  ExternalService handles all retry and circuit breaker logic for calls to external services.
  """

  alias ExternalService.RetryOptions
  alias ExternalService.RateLimit
  alias :fuse, as: Fuse

  require Logger

  @typedoc "Name of a fuse"
  @type fuse_name :: term()

  @typedoc "Error tuple returned when the allowable number of retries has been exceeded"
  @type retries_exhausted :: {:error, {:retries_exhausted, reason :: any}}

  @typedoc "Error tuple returned when a fuse has been melted enough times that the fuse is blown"
  @type fuse_blown :: {:error, {:fuse_blown, fuse_name}}

  @typedoc "Error tuple returned when a fuse has not been initialized with `ExternalService.start/1`"
  @type fuse_not_found :: {:error, {:fuse_not_found, fuse_name}}

  @typedoc "Union type representing all the possible error tuple return values"
  @type error :: retries_exhausted | fuse_blown | fuse_not_found

  @type retriable_function_result ::
          :retry | {:retry, reason :: any()} | (function_result :: any())

  @type retriable_function :: (() -> retriable_function_result())

  @typedoc """
  Strategy controlling fuse behavior.
  """
  @type fuse_strategy ::
          {:standard, max_melt_attempts :: pos_integer(), time_window :: pos_integer()}
          | {:fault_injection, rate :: float(), max_melt_attempts :: pos_integer(),
             time_window :: pos_integer()}

  @typedoc """
  A tuple specifying rate-limiting behavior.

  The first element of the tuple is the number of calls to allow in a given time window.
  The second element is the time window in milliseconds.
  """
  @type rate_limit :: {limit :: pos_integer(), time_window :: pos_integer()}

  @typedoc """
  The sleep function to be called when reaching the configured rate limit quota.

  In some situations, like tests, blocking the process for an extended period of
  time can be undesired. In these cases this function can be changed. Defaults
  to `Process.sleep/1`.
  """
  @type sleep_function :: (number -> any)

  @typedoc """
  Options used for controlling circuit breaker and rate-limiting behavior.

  See the [fuse docs](https://hexdocs.pm/fuse/) for further information about available fuse options.
  """
  @type options :: [
          fuse_strategy: fuse_strategy(),
          fuse_refresh: pos_integer(),
          rate_limit: rate_limit(),
          sleep_function: sleep_function()
        ]

  @default_fuse_options %{
    fuse_strategy: {:standard, 10, 10_000},
    fuse_refresh: 60_000
  }

  defmodule RetriesExhaustedError do
    @moduledoc """
    Exception raised by `ExternalService.call!/3` when the allowable number of retries has been
    exceeded.
    """
    defexception [:message]
  end

  defmodule FuseBlownError do
    @moduledoc """
    Exception raised by `ExternalService.call!/3` when a fuse has been melted enough times that
    the fuse is blown.
    """
    defexception [:message]
  end

  defmodule FuseNotFoundError do
    @moduledoc """
    Exception raised by `ExternalService.call!/3` when a fuse has not been initialized with
    `ExternalService.start/1`.
    """
    defexception [:message]
  end

  defmodule State do
    @moduledoc false

    defstruct [:fuse_name, :fuse_options, :rate_limit]

    def init(fuse_name, fuse_options, rate_limit) do
      state = %__MODULE__{
        fuse_name: fuse_name,
        fuse_options: fuse_options,
        rate_limit: rate_limit
      }

      Agent.start(fn -> state end, name: registered_name(fuse_name))
      state
    end

    def get(fuse_name), do: Agent.get(registered_name(fuse_name), & &1)

    def registered_name(fuse_name), do: Module.concat(fuse_name, __MODULE__)
  end

  @doc """
  Initializes a new fuse for a specific service.

  The `fuse_name` is a term that uniquely identifies an external service within the scope of
  an application.

  The `options` argument allows for controlling the circuit breaker behavior and rate-limiting
  behavior when making calls to the external service. See `t:options/0` for details.
  """
  @spec start(fuse_name(), options()) :: :ok
  def start(fuse_name, options \\ []) do
    fuse_opts = {
      Keyword.get(options, :fuse_strategy, @default_fuse_options.fuse_strategy),
      {:reset, Keyword.get(options, :fuse_refresh, @default_fuse_options.fuse_refresh)}
    }

    :ok = Fuse.install(fuse_name, fuse_opts)
    rate_limit = RateLimit.new(fuse_name, Keyword.get(options, :rate_limit), options)
    State.init(fuse_name, fuse_opts, rate_limit)
    :ok
  end

  @doc """
  Stops the fuse for a specific service.
  """
  @spec stop(fuse_name()) :: :ok
  def stop(fuse_name) when is_atom(fuse_name) do
    :ok = Fuse.remove(fuse_name)
    :ok = State.registered_name(fuse_name) |> Agent.stop()
    :ok
  end

  @doc """
  Resets the given fuse.

  After reset, the fuse will be unbroken with no melts.
  """
  @spec reset_fuse(fuse_name()) :: :ok | {:error, :not_found}
  def reset_fuse(fuse_name), do: Fuse.reset(fuse_name)

  @doc """
  Given a fuse name and retry options execute a function handling any retry and circuit breaker
  logic.

  `ExternalService.start` must be run with the fuse name before using call.

  The provided function can indicate that a retry should be performed by returning the atom
  `:retry` or a tuple of the form `{:retry, reason}`, where `reason` is any arbitrary term, or by
  raising a `RuntimeError`. Any other result is considered successful so the operation will not be
  retried and the result of the function will be returned as the result of `call`.
  """
  @spec call(fuse_name(), RetryOptions.t(), retriable_function()) ::
          error | (function_result :: any)
  def call(fuse_name, retry_opts \\ %RetryOptions{}, function) do
    case call_with_retry(fuse_name, retry_opts, function) do
      {:no_retry, result} -> result
      {:error, :retry} -> {:error, {:retries_exhausted, :reason_unknown}}
      {:error, {:retry, reason}} -> {:error, {:retries_exhausted, reason}}
      result -> result
    end
  end

  @doc """
  Like `call/3`, but raises an exception if retries are exhausted or the fuse is blown.
  """
  @spec call!(fuse_name(), RetryOptions.t(), retriable_function()) ::
          function_result :: any | no_return
  def call!(fuse_name, retry_opts \\ %RetryOptions{}, function) do
    case call_with_retry(fuse_name, retry_opts, function) do
      {:no_retry, result} ->
        result

      {:error, :retry} ->
        raise ExternalService.RetriesExhaustedError, message: "fuse_name: #{fuse_name}"

      {:error, {:retry, reason}} ->
        raise ExternalService.RetriesExhaustedError,
          message: "reason: #{inspect(reason)}, fuse_name: #{fuse_name}"

      {:error, {:fuse_blown, fuse_name}} ->
        raise ExternalService.FuseBlownError, message: inspect(fuse_name)

      {:error, {:fuse_not_found, fuse_name}} ->
        raise ExternalService.FuseNotFoundError, message: fuse_not_found_message(fuse_name)
    end
  end

  @doc """
  Asynchronous version of `ExternalService.call`.

  Returns a `Task` that may be used to retrieve the result of the async call.
  """
  @spec call_async(fuse_name(), RetryOptions.t(), retriable_function()) :: Task.t()
  def call_async(fuse_name, retry_opts \\ %RetryOptions{}, function) do
    Task.async(fn -> call(fuse_name, retry_opts, function) end)
  end

  @doc """
  Parallel, streaming version of `ExternalService.call`.

  See `call_async_stream/5` for full documentation.
  """
  @spec call_async_stream(Enumerable.t(), fuse_name(), (any() -> retriable_function_result())) ::
          Enumerable.t()
  def call_async_stream(enumerable, fuse_name, function) when is_function(function),
    do: call_async_stream(enumerable, fuse_name, %RetryOptions{}, [], function)

  @doc """
  Parallel, streaming version of `ExternalService.call`.

  See `call_async_stream/5` for full documentation.
  """
  @spec call_async_stream(
          Enumerable.t(),
          fuse_name(),
          RetryOptions.t() | (async_opts :: list()),
          (any() -> retriable_function_result())
        ) :: Enumerable.t()
  def call_async_stream(enumerable, fuse_name, retry_opts_or_async_opts, function)

  def call_async_stream(enumerable, fuse_name, retry_opts = %RetryOptions{}, function)
      when is_function(function),
      do: call_async_stream(enumerable, fuse_name, retry_opts, [], function)

  def call_async_stream(enumerable, fuse_name, async_opts, function)
      when is_list(async_opts) and is_function(function),
      do: call_async_stream(enumerable, fuse_name, %RetryOptions{}, async_opts, function)

  @doc """
  Parallel, streaming version of `ExternalService.call`.

  This function uses Elixir's built-in `Task.async_stream/3` function and the description below is
  taken from there.

  Returns a stream that runs the given function `function` concurrently on each
  item in `enumerable`.

  Each `enumerable` item is passed as argument to the given function `function`
  and processed by its own task. The tasks will be linked to the current
  process, similarly to `async/1`.
  """
  @spec call_async_stream(
          Enumerable.t(),
          fuse_name(),
          RetryOptions.t(),
          async_opts :: list(),
          (any() -> retriable_function_result())
        ) :: Enumerable.t()
  def call_async_stream(enumerable, fuse_name, retry_opts = %RetryOptions{}, async_opts, function)
      when is_list(async_opts) and is_function(function) do
    fun = fn item ->
      call(fuse_name, retry_opts, fn -> function.(item) end)
    end

    Task.async_stream(enumerable, fun, async_opts)
  end

  @spec call_with_retry(fuse_name(), RetryOptions.t(), retriable_function()) ::
          {:no_retry, function_result :: any()}
          | {:error, :retry}
          | {:error, {:retry, reason :: any()}}
          | fuse_blown
          | fuse_not_found
          | (function_result :: any())
  defp call_with_retry(fuse_name, retry_opts, function) do
    require Retry

    Retry.retry with: apply_retry_options(retry_opts), rescue_only: retry_opts.rescue_only do
      case Fuse.ask(fuse_name, :sync) do
        :ok -> try_function(fuse_name, function)
        :blown -> throw(:blown)
        {:error, :not_found} -> throw(:not_found)
      end
    after
      {:no_retry, _} = result -> result
    else
      {:error, :retry} = error -> error
      {:error, {:retry, _reason}} = error -> error
      error -> raise(error)
    end
  catch
    :blown ->
      {:error, {:fuse_blown, fuse_name}}

    :not_found ->
      log_fuse_not_found(fuse_name)
      {:error, {:fuse_not_found, fuse_name}}
  end

  defp apply_retry_options(retry_opts) do
    import Retry.DelayStreams

    delay_stream =
      case retry_opts.backoff do
        {:exponential, initial_delay} -> exponential_backoff(initial_delay)
        {:linear, initial_delay, factor} -> linear_backoff(initial_delay, factor)
      end

    retry_opts
    |> Map.take([:randomize, :expiry, :cap])
    |> Enum.reduce(delay_stream, fn {key, value}, acc ->
      if value do
        apply(Retry.DelayStreams, key, [acc, value])
      else
        acc
      end
    end)
  end

  @spec try_function(fuse_name, retriable_function) ::
          {:error, {:retry, any}} | {:error, :retry} | {:no_retry, any} | no_return
  defp try_function(fuse_name, function) do
    rate_limit = State.get(fuse_name).rate_limit

    case RateLimit.call(rate_limit, function) do
      {:retry, reason} ->
        Fuse.melt(fuse_name)
        {:error, {:retry, reason}}

      :retry ->
        Fuse.melt(fuse_name)
        {:error, :retry}

      result ->
        {:no_retry, result}
    end
  rescue
    error ->
      Fuse.melt(fuse_name)
      reraise error, __STACKTRACE__
  end

  defp log_fuse_not_found(fuse_name) do
    Logger.error(fuse_not_found_message(fuse_name))
  end

  defp fuse_not_found_message(fuse_name) do
    fuse_name = inspect(fuse_name)

    "Fuse :#{fuse_name} not found. To initialize this fuse, call " <>
      "ExternalService.start(:#{fuse_name}) in your application start code."
  end
end