lib/tw/v1_1/cursored_result.ex

defmodule Tw.V1_1.CursoredResult.StreamError do
  @moduledoc """
  Wrap the error that occurred in `Tw.V1_1.CursedResult.stream!/3`.
  """
  defexception [:cursor, :message, :original_error]

  @impl true
  def exception(opts) do
    error = opts[:error]
    %__MODULE__{cursor: opts[:cursor], message: error.message, original_error: error}
  end
end

defmodule Tw.V1_1.CursoredResult do
  @moduledoc """
  See [the Twitter API documentation](https://developer.twitter.com/en/docs/twitter-api/v1/pagination) for details.
  """

  alias Tw.V1_1.CursoredResult.StreamError
  alias Tw.V1_1.TwitterAPIError

  @type cursor :: integer()

  @type t(key, type) :: %{
          key => type,
          next_cursor: cursor(),
          next_cursor_str: binary(),
          previous_cursor: cursor(),
          previous_cursor_str: binary()
        }

  @doc """
  Return if there is a next page from a cursor.

  ## Examples

      iex> {:ok, res} = Tw.V1_1.User.follower_ids(client)
      iex> Tw.V1_1.CursoredResult.has_next?(res)
      true
  """
  @spec has_next?(%{next_cursor: integer}) :: boolean()
  def has_next?(cursored_result) do
    cursored_result.next_cursor != 0
  end

  @doc """
  Return if there is a previous page from a cursor.

  ## Examples

      iex> {:ok, res} = Tw.V1_1.User.follower_ids(client)
      iex> Tw.V1_1.CursoredResult.has_previous?(res)
      false
  """
  @spec has_previous?(%{previous_cursor: integer}) :: boolean()
  def has_previous?(cursored_result) do
    cursored_result.previous_cursor != 0
  end

  @doc """
  Return cursored endpoints results as `Stream`.

  ## Examples

      iex> Tw.V1_1.CursoredResult.stream!(:ids, fn cursor -> Tw.V1_1.User.fllower_ids(client, %{screen_name: "twitterapi", cursor: cursor} end)
      ...> |> Enum.each(&IO.inspect/1)

      iex> Tw.V1_1.CursoredResult.stream!(:ids, fn cursor -> Tw.V1_1.User.fllower_ids(client, %{screen_name: "twitterapi", cursor: cursor} end)
      ...> |> Stream.run()
      ** (Tw.V1_1.CursoredResult.StreamError) Rate limit exceeded

  """
  @spec stream!(atom(), (integer -> {:ok, map} | {:error, Exception.t()}), integer()) :: Enumerable.t()
  def stream!(key, func, initial_cursor \\ -1) do
    Stream.unfold(
      initial_cursor,
      fn
        0 ->
          nil

        cursor ->
          case func.(cursor) do
            {:ok, res} ->
              {Map.get(res, key), res.next_cursor}

            {:error, error} ->
              raise StreamError.exception(cursor: cursor, error: error)
          end
      end
    )
    |> Stream.flat_map(&Function.identity/1)
  end

  @doc """
  Return cursored endpoints results as `Stream`.
  If the rate limit is exceeded, sleep and retry.

  ## Examples

      iex> Tw.V1_1.CursoredResult.persevering_stream!(:ids, fn cursor -> Tw.V1_1.User.fllower_ids(client, %{screen_name: "twitterapi", cursor: cursor}) end)
      ...> |> Enum.each(&IO.inspect/1)


  """
  @spec persevering_stream!(atom(), (integer -> {:ok, map} | {:error, Exception.t()}), integer()) :: Enumerable.t()
  def persevering_stream!(key, func, initial_cursor \\ -1) do
    recursive_func = fn c ->
      g = fn
        0, _ ->
          nil

        cursor, f ->
          case func.(cursor) do
            {:ok, res} ->
              {Map.get(res, key), res.next_cursor}

            {:error, %TwitterAPIError{} = error} ->
              if TwitterAPIError.rate_limit_exceeded?(error) do
                TwitterAPIError.rate_limit_reset_in(error)
                |> Process.sleep()

                f.(cursor, f)
              else
                raise StreamError.exception(cursor: cursor, error: error)
              end

            {:error, error} ->
              raise StreamError.exception(cursor: cursor, error: error)
          end
      end

      g.(c, g)
    end

    Stream.unfold(initial_cursor, recursive_func)
    |> Stream.flat_map(&Function.identity/1)
  end
end