lib/retry/delay_streams.ex

defmodule Retry.DelayStreams do
  @moduledoc """

  This module provide a set of helper functions that produce delay streams for
  use with `retry`.

  """

  @doc """

  Returns a stream of delays that increase exponentially.

  Example

      retry with: exponential_backoff do
        # ...
      end

  """
  @spec exponential_backoff(pos_integer(), pos_integer()) :: Enumerable.t()
  def exponential_backoff(initial_delay \\ 10, factor \\ 2) do
    Stream.unfold(initial_delay, fn last_delay ->
      {last_delay, round(last_delay * factor)}
    end)
  end

  @doc """

  Returns a stream in which each element of `delays` is randomly adjusted to a number
  between 1 and the original delay.

  Example

      retry with: exponential_backoff() |> jitter() do
        # ...
      end

  """
  @spec jitter(Enumerable.t()) :: Enumerable.t()
  def jitter(delays) do
    Stream.map(delays, fn delay ->
      delay
      |> trunc
      |> random_uniform
    end)
  end

  @doc """

  Returns a stream of delays that increase linearly.

  Example

      retry with: linear_backoff(50, 2) do
        # ...
      end

  """
  @spec linear_backoff(pos_integer(), pos_integer()) :: Enumerable.t()
  def linear_backoff(initial_delay, factor) do
    Stream.unfold(0, fn failures ->
      next_d = initial_delay + failures * factor
      {next_d, failures + 1}
    end)
  end

  @doc """

  Returns a constant stream of delays.

  Example

      retry with: constant_backoff(50) do
        # ...
      end

  """
  @spec constant_backoff(pos_integer()) :: Enumerable.t()
  def constant_backoff(delay \\ 100) do
    Stream.repeatedly(fn -> delay end)
  end

  @doc """

  Returns a stream in which each element of `delays` is randomly adjusted no
  more than `proportion` of the delay.

  Example

      retry with: exponential_backoff() |> randomize do
        # ...
      end

  Produces an exponentially increasing delay stream where each delay is randomly
  adjusted to be within 10 percent of the original value

  """
  @spec randomize(Enumerable.t(), float()) :: Enumerable.t()
  def randomize(delays, proportion \\ 0.1) do
    Stream.map(delays, fn d ->
      max_delta = round(d * proportion)
      shift = random_uniform(2 * max_delta) - max_delta

      case d + shift do
        n when n <= 0 -> 0
        n -> n
      end
    end)
  end

  @doc """

  Returns a stream that is the same as `delays` except that the delays never
  exceed `max`. This allow capping the delay between attempts to some max value.

  Example

      retry with: exponential_backoff() |> cap(10_000) do
        # ...
      end

  Produces an exponentially increasing delay stream until the delay reaches 10
  seconds at which point it stops increasing

  """
  @spec cap(Enumerable.t(), pos_integer()) :: Enumerable.t()
  def cap(delays, max) do
    Stream.map(
      delays,
      fn
        d when d <= max -> d
        _ -> max
      end
    )
  end

  @doc """

  Returns a delay stream that is the same as `delays` except it limits the total
  life span of the stream to `time_budget`. This calculation takes the execution
  time of the block being retried into account.

  The execution of the code within the block will not be interrupted, so
  the total time of execution may run over the `time_budget` depending on how
  long a single try will take.

  Optionally, you can specify a minimum delay so the smallest value doesn't go
  below the threshold.

  Example

      retry with: exponential_backoff() |> expiry(1_000) do
        # ...
      end

  Produces a delay stream that ends after 1 second has elapsed since its
  creation.

  """
  @spec expiry(Enumerable.t(), pos_integer(), pos_integer()) :: Enumerable.t()
  def expiry(delays, time_budget, min_delay \\ 100) do
    Stream.resource(
      fn -> {delays, :os.system_time(:milli_seconds) + time_budget} end,
      fn
        :at_end -> {:halt, :at_end}
        {remaining_delays, end_t} -> reduce_delays(remaining_delays, end_t, min_delay)
      end,
      fn _ -> :noop end
    )
  end

  defp reduce_delays(remaining_delays, end_t, min_delay) do
    case Enum.take(remaining_delays, 1) do
      [preferred_delay] ->
        now_t = :os.system_time(:milli_seconds)
        remaining_t = Enum.max([end_t - now_t, min_delay])

        if preferred_delay >= remaining_t or remaining_t == min_delay do
          # one last try before time budget is exceeded
          {[remaining_t], :at_end}
        else
          # default
          {[preferred_delay], {Stream.drop(remaining_delays, 1), end_t}}
        end

      _ ->
        # reached end of stream - no more tries
        {:halt, :at_end}
    end
  end

  defp random_uniform(n) when n <= 0, do: 0
  defp random_uniform(n), do: :rand.uniform(n)
end