lib/nostrum/api/ratelimiter.ex

defmodule Nostrum.Api.Ratelimiter do
  @moduledoc """
  Handles REST calls to the Discord API while respecting ratelimits.


  ## Purpose

  Discord's API returns information about ratelimits that we must respect. This
  module performs serialization of these requests through a single process,
  thus preventing concurrency issues from arising if two processes make a
  remote API call at the same time.


  > ### Internal module {: .info}
  >
  > This module is intended for exclusive usage inside of nostrum, and is
  > documented for completeness and people curious to look behind the covers.


  ## Asynchronous requests

  The ratelimiter is fully asynchronous internally. In theory, it also supports
  queueing requests in an asynchronous manner. However, support for this is
  currently not implemented in `Nostrum.Api`.

  If you want to make one or multiple asynchronous requests manually, you can
  use the following pattern:

  ```elixir
  req = :gen_statem.send_request(Nostrum.Api.Ratelimiter, {:queue, request})
  # ...
  response = :gen_statem.receive_response(req, timeout)
  ```

  where `request` is a map describing the request to run - see `Nostrum.Api`
  for more information. You can also send multiple requests at the same time
  and wait for their response: see `:gen_statem.reqids_add/3` and
  `:gen_statem.wait_response/3` for more information.


  ## Multi-node

  If a single global process is desired to handle all ratelimiting, the
  ratelimiter can theoretically be adjusted to start registered via `:global`.
  In practice, it may be more beneficial to have a local ratelimiter process on
  each node and either using the local one for any API calls, or using a
  consistent hash mechanism to distribute API requests around the cluster as
  needed. **Do note that the API enforces a global user ratelimit across all
  requests**. With a single process, the ratelimiter can track this without
  hitting 429s at all, with multiple ratelimiters, the built-in requeue
  functionality may or may not help.


  ## Inner workings

  When a client process wants to perform some request on the Discord API, it
  sends a request to the `:gen_statem` behind this module to ask it to `:queue`
  the incoming request.


  ### Connection setup

  If the state machine is not connected to the HTTP endpoint, it will
  transition to the `:connecting` state and try to open the connection. If this
  succeeds, it transitions to the `:connected` state.

  ### Queueing requests

  The state machine associates a `t::queue.queue/1` of `t:queued_request/0` to
  each individual bucket, together with an internal count of remaining calls.
  When queueing requests, the following cases occur:

  - If there are no remaining calls in the bot's global ratelimit bucket or
  there are no remaining calls in the bucket, the request is put into the
  bucket's queue.

  - If there is an `:initial` running request to the bucket, the request is put
  into the bucket's queue.

  - If there are more than 0 remaining calls on both the request-specific
  bucket and the global bucket, the request is started right away. This allows
  nostrum to dispatch multiple requests to the same endpoint as soon as
  possible as long as calls remain.

  - If no ratelimit information is known for the bucket and remaining calls on
  the global bucket, the request is sent out as the "pioneer" request that will
  retrieve how many calls we have for this bucket (`:initial`, see above).

  - If none of the above is true, a new queue is created and the pending
  rqeuest marked as the `:initial` request. It will be run as soon as the bot's
  global limit limit expires.

  The request starting function, `:next`, will start new requests from the
  queue as long as more calls are possible in the timeframe. Any requests are
  then started asynchronously. Bookkeeping is set up to associate the resulting
  `t::gun.stream_ref/0` with the original client along with its request and the
  ratelimiter bucket.

  Results from the HTTP connection are delivered non-blocking: simple responses
  with purely status codes and no body (code `204`) will be sent in a single
  message, other requests will be sent to us incrementally. To finally deliver
  the full response body to the client with the final package, an internal
  buffer of the body is kept. A possible future optimization could be having a
  way for `:gun` to only send the ratelimiter state machine the initial
  `:gun_response` and forward any item of the body directly to the client.

  When the headers for a request have been received, the ratelimiter parses the
  ratelimit information and starts off an internal timer expiring when the
  ratelimits expire. It will also reschedule calls with the `:next` internal
  event for as many remaining calls as it knows about. Once the timer expires
  for the current bucket, two cases can happen:

  - The queue has items: Schedule all items and repeat this later.

  - The queue is empty: Delete the queue and remaining calls from the
  outstanding buckets.

  In practice, this means that we never store more information than we need,
  and removes the previous regular bucket sweeping functionality that the
  ratelimit buckets required.

  **Global ratelimits** (note this is a distinct ratelimit from the bot's
  "global", per-user ratelimit) are handled with the special `global_limit`
  state. This state is entered for exactly the the `X-Ratelimit-Reset-After`
  time provided in the global ratelimit response. This state does nothing apart
  from postponing any events it receives and returning to the previous state
  (`:connected`) once the global timeout is gone. Requests that failed because
  of the global ratelimit are requeued after returning back into the regular
  state: a warning is logged to inform you of this.


  ### Failure modes

  #### HTTP connection death

  If the HTTP connection dies, the ratelimiter will inform each affected client
  by replying with `{:error, {:connection_died, reason}}`, where `reason` is
  the reason as provided by the `:gun_down` event. It will then transition to
  `:disconnected` state. If no requests were running at time the connection was
  shut down - for instance, because we simply reached the maximum idle time on
  the HTTP/2 connection - we will simply move on.

  #### Upstream errors

  The ratelimiter works by queueing requests aggressively as soon as it has
  ratelimit information to do so. If no ratelimit information is available, for
  instance, because Discord returned us a 502 status code, the ratelimiter will
  not automatically kick the queue to start further running requests.

  #### Other internal issues

  Any other internal problems that are not handled appropriately in the
  ratelimiter will crash it, effectively resulting in the complete loss of any
  queued requests.


  ### Implementation benefits & drawbacks

  #### A history of ratelimiting

  First, it is important to give a short history of nostrum's ratelimiting: pre
  `0.8`, nostrum used to use a `GenServer` that would call out to ETS tables to
  look up ratelimiting buckets for requests. If it needed to sleep before
  issuing a request due to the bucket being exhausted, it would do so in the
  server process and block other callers.

  In nostrum 0.8, the existing ratelimiter bucket storage architecture was
  refactored to be based around the [pluggable caching
  functionality](../advanced/pluggable_caching.md), and buckets with no
  remaining calls were adjusted to be slept out on the client-side by having
  the `GenServer` respond to the client with `{:error, {:retry_after, millis}}`
  and the client trying again and again to schedule its requests. This allowed
  users to distribute their ratelimit buckets around however they wish, out of
  the box, nostrum shipped with an ETS and a Mnesia-based ratelimit bucket
  store.


  #### Problems we solved

  The approach above still came with a few problems:

  - Requests were still being done synchronously in the ratelimiter, and it was
  blocked from anything else whilst running the requests, even though we are
  theoretically free to start requests for other buckets while one is still
  running.

  - The ratelimiter itself was half working on its own, but half required the
  external storage mechanisms, which made the code hard to follow and required
  regular automatic pruning because the store had no idea when a bucket was no
  longer relevant on its own.

  - Requests would not be pipelined to run as soon as ideally possible.

  - The ratelimiter did not inform clients if their request died in-flight.

  - If the client disconnected before we returned the response, we had to
  handle this explicitly via `handle_info`.

  The new state machine-based ratelimiter solves these problems.
  """

  @behaviour :gen_statem

  alias Nostrum.Api.Base
  alias Nostrum.Constants
  alias Nostrum.Error.ApiError

  require Logger

  @major_parameters ["channels", "guilds", "webhooks"]
  @registered_name __MODULE__
  # Discord mandates a specific number of API calls we may make across (almost)
  # all endpoints per second. See the "global rate limit" documentation for
  # more information:
  # https://discord.com/developers/docs/topics/rate-limits#global-rate-limit
  @bot_calls_per_window 50
  @bot_calls_time_window :timer.seconds(1)
  @bot_calls_timeout_event :reset_bot_calls_window

  @typedoc """
  A bucket for endpoints unter the same ratelimit.
  """
  @typedoc since: "0.9.0"
  @type bucket :: String.t()

  @typedoc """
  A request to make in the ratelimiter.
  """
  @typedoc since: "0.9.0"
  @type request :: %{
          method: :get | :post | :put | :delete,
          route: String.t(),
          body: iodata(),
          headers: [{String.t(), String.t()}],
          params: Enum.t()
        }

  @typedoc """
  A bucket-specific request waiting to be queued, alongside its client.
  """
  @typedoc since: "0.9.0"
  @type queued_request :: {request(), client :: :gen_statem.from()}

  @typedoc """
  Remaining calls on a route, as provided by the API response.

  The ratelimiter internally counts the remaining calls per route to dispatch
  new requests as soon as it's capable of doing so, but this is only possible
  if the API already provided us with ratelimit information for an endpoint.

  Therefore, if the initial call on an endpoint is made, the special `:initial`
  value is specified. This is used by the limit parsing function to set the
  remaining calls if and only if it is the response for the initial call -
  otherwise, the value won't represent the truth anymore.
  """
  @typedoc since: "0.9.0"
  @type remaining :: non_neg_integer() | :initial

  @typedoc """
  The state of the ratelimiter.

  While this has no public use, it is still documented here to provide help
  when tracing the ratelimiter via `:sys.trace/2` or other means.

  ## Fields

  - `:outstanding`: Outstanding (unqueued) requests per bucket alongside with
  the remaining calls that may be made on said bucket.

  - `:running`: Requests that have been sent off. Used to associate back the
  client with a request when the response comes in.

  - `:inflight`: Requests for which we have started getting a response, but we
  have not fully received it yet. For responses that have a body, this will
  buffer their body until we can send it back to the client.

  - `:conn`: The `:gun` connection backing the server. Used for making new
  requests, and updated as the state changes.

  - `:remaining_in_window`: How many calls we may still make to the API during
  this time window. Reset automatically via timeouts.
  """
  @typedoc since: "0.9.0"
  @type state :: %{
          outstanding: %{
            bucket => {remaining, :queue.queue(queued_request)}
          },
          running: %{
            :gun.stream_ref() => {bucket(), request(), :gen_statem.from()}
          },
          inflight: %{
            :gun.stream_ref() =>
              {status :: non_neg_integer(), headers :: [{String.t(), String.t()}],
               body :: String.t()}
          },
          conn: pid() | nil,
          remaining_in_window: non_neg_integer()
        }

  @doc """
  Starts the ratelimiter.
  """
  @spec start_link([:gen_statem.start_opt()]) :: :gen_statem.start_ret()
  def start_link(opts) do
    :gen_statem.start_link({:local, @registered_name}, __MODULE__, [], opts)
  end

  def init([]) do
    # Uncomment the following to trace everything the ratelimiter is doing:
    #   me = self()
    #   spawn(fn -> :sys.trace(me, true) end)
    # See more examples in the `sys` docs.
    {:ok, :disconnected, empty_state()}
  end

  def callback_mode, do: :state_functions

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end

  # The Glorious State Machine
  # Inspired by Peter Morgan's "Postpone: Resource Allocation on Demand"
  #   https://shortishly.com/blog/postpone-resource-allocation-on-demand/

  def disconnected({:call, _from}, _request, data) do
    {:next_state, :connecting, data,
     [
       {:next_event, :internal, :open},
       {:state_timeout, :timer.seconds(10), :connect_timeout},
       :postpone
     ]}
  end

  # We were informed that a bucket expired, and we have outstanding requests
  # (that were previously ratelimited) for that bucket. We need to connect and
  # run it - just like when receiving a queue request above.
  def disconnected({:timeout, bucket}, :expired, %{outstanding: outstanding} = data)
      when is_map_key(outstanding, bucket) do
    {:next_state, :connecting, data,
     [
       {:next_event, :internal, :open},
       {:state_timeout, :timer.seconds(10), :connect_timeout},
       :postpone
     ]}
  end

  # We received a timeout for a bucket that does not have any pending requests.
  # This means that the remaining requests got to exactly 0 before we ceased
  # sending further requests.
  def disconnected({:timeout, _bucket}, :expired, _data) do
    :keep_state_and_data
  end

  def connecting(:internal, :open, data) do
    domain = to_charlist(Constants.domain())

    open_opts = %{
      connect_timeout: :timer.seconds(5),
      domain_lookup_timeout: :timer.seconds(5),
      # Do not retry here. If we retry, it is possible that after the state
      # machine heads into disconnected state, it receives an unexpected
      # `:gun_up` message. We want the state machine to manage the connection
      # lifecycle fully on its own.
      retry: 0,
      tls_handshake_timeout: :timer.seconds(5),
      tls_opts: Constants.gun_tls_opts()
    }

    {:ok, conn_pid} = :gun.open(domain, 443, open_opts)
    {:keep_state, %{data | conn: conn_pid}}
  end

  def connecting(:info, {:gun_up, conn_pid, _}, %{conn: conn_pid} = data) do
    {:next_state, :connected, data}
  end

  def connecting({:call, _from}, _request, _data) do
    {:keep_state_and_data, :postpone}
  end

  def connecting({:timeout, _bucket}, :expired, _data) do
    {:keep_state_and_data, :postpone}
  end

  def connecting(:state_timeout, :connect_timeout, _data) do
    {:stop, :connect_timeout}
  end

  # Client request: Queue the given request, and respond when we have the response.
  def connected({:call, from}, {:queue, request}, _data) do
    {:keep_state_and_data, {:next_event, :internal, {:queue, {request, from}}}}
  end

  # Enqueue the passed request.
  def connected(
        :internal,
        {:queue, {payload, from}},
        %{outstanding: outstanding, remaining_in_window: remaining_in_window} = data
      ) do
    bucket = get_endpoint(payload.route, payload.method)

    # The outstanding maps contains pairs in the form `{remaining, queue}`,
    # where `remaining` is the amount of remaining calls we may make, and
    # `queue` is the waiting line of requests. If the ratelimit on the bucket
    # expires, the internal timeout event will automatically reschedule queued
    # requests (starting with a single one to get the calls we may make).
    case Map.get(outstanding, bucket) do
      # We have no remaining calls on the bucket, on the bot user, or the
      # initial call to get rate limiting information is in flight. Let's join
      # the waiting line.
      {remaining, queue} when remaining in [0, :initial] or remaining_in_window == 0 ->
        entry = {payload, from}

        data_with_this_queued =
          put_in(data, [:outstanding, bucket], {remaining, :queue.in(entry, queue)})

        {:keep_state, data_with_this_queued}

      # There is an entry - so somebody did find some ratelimiting information
      # here recently - but that entry tells us we may make a call right away.
      {remaining, queue} when remaining > 0 and remaining_in_window > 0 ->
        # Sanity check. This can be removed after release is considered stable.
        # Why should this be empty?
        # Because when we receive ratelimit information and see that there are
        # still items in the queue, we should internally schedule them right away.
        # Otherwise, we are mixing up the order.
        true = :queue.is_empty(queue)
        {:keep_state_and_data, [{:next_event, :internal, {:run, payload, bucket, from}}]}

      # There is no entry. We are the pioneer for this bucket...
      nil when remaining_in_window > 0 ->
        # ... and we are good on the bot ratelimits!

        # Since we don't have any explicit ratelimiting information for this
        # bucket yet, we set the remaining calls to the special `:initial`
        # value. The ratelimit response header parser uses this value to know
        # when it should update ratelimit information from upstream, and new
        # incoming requests will be held off appropriately.
        run_request = {:next_event, :internal, {:run, payload, bucket, from}}
        data_with_new_queue = put_in(data, [:outstanding, bucket], {:initial, :queue.new()})
        {:keep_state, data_with_new_queue, [run_request]}

      nil ->
        # ... but we are not good on the bot ratelimits! Add this to the queue.
        entry = {payload, from}

        queue = :queue.new()

        data_with_this_queued =
          put_in(data, [:outstanding, bucket], {:initial, :queue.in(entry, queue)})

        {:keep_state, data_with_this_queued}
    end
  end

  def connected(:internal, {:requeue, {_payload, _from} = request}, _data) do
    {:keep_state_and_data, {:next_event, :internal, {:queue, request}}}
  end

  # Run the given request right now, and do any bookkeeping. We assert we are
  # good on remaining calls, so we can just run the request.
  def connected(
        :internal,
        {:run, request, bucket, from},
        %{conn: conn, outstanding: outstanding, remaining_in_window: remaining_for_user} = data
      )
      when remaining_for_user > 0 and is_map_key(outstanding, bucket) do
    stream =
      Base.request(
        conn,
        request.method,
        request.route,
        request.body,
        request.headers,
        request.params
      )

    data_with_this_running = put_in(data, [:running, stream], {bucket, request, from})
    {:keep_state, data_with_this_running, {:next_event, :internal, {:account_request, bucket}}}
  end

  # Account for the request on the given bucket. At a full user (global)
  # bucket, start the timer to empty it after the timeout. Regardless of the
  # user bucket, account for the individual request.
  def connected(
        :internal,
        {:account_request, _bucket} = request,
        %{remaining_in_window: @bot_calls_per_window} = data
      ) do
    {:keep_state, %{data | remaining_in_window: @bot_calls_per_window - 1},
     [
       {{:timeout, @bot_calls_timeout_event}, @bot_calls_time_window, :expired},
       {:next_event, :internal, request}
     ]}
  end

  def connected(
        :internal,
        {:account_request, bucket},
        %{remaining_in_window: remaining_in_window, outstanding: outstanding} = data
      )
      when remaining_in_window > 0 do
    %{^bucket => entry} = outstanding

    case entry do
      {:initial, _queue} ->
        # This was the first request here, we just need to account the use requests.
        {:keep_state, %{data | remaining_in_window: remaining_in_window - 1}}

      {remaining_in_bucket, queue} ->
        {:keep_state,
         %{
           data
           | outstanding: %{outstanding | bucket => {remaining_in_bucket - 1, queue}},
             remaining_in_window: remaining_in_window - 1
         }}
    end
  end

  # `:next` will run the next `remaining` requests for the given bucket's
  # queue, and stop as soon as no more entries are found, or the user limit has
  # been reached for this window.
  def connected(:internal, {:next, 0, _bucket}, _data) do
    :keep_state_and_data
  end

  def connected(:internal, {:next, _remaining, _bucket}, %{remaining_in_window: 0}) do
    :keep_state_and_data
  end

  # This is the initial request to this bucket, and we want to run the next
  # request. That happens if the request's bucket was not affected by the
  # ratelimits at time of queueing, but the bot was in the user ratelimit
  # state. Treat it as a "we have one request remaining", because that initial
  # request will probe for how many remaining requests we actually have (and
  # the marker value will prevent further requests from executing)
  def connected(:internal, {:next, :initial, bucket}, _data) do
    {:keep_state_and_data, {:next_event, :internal, {:next, 1, bucket}}}
  end

  # Run the next request for the given bucket, with > 0 and non-initial remaining calls.
  def connected(:internal, {:next, remaining, bucket}, %{outstanding: outstanding} = data) do
    # "_remaining" here could be either the `remaining` value from above
    # or `:initial` in the case where we're doing a global-limit requeue.
    {_remaining, queue} = Map.fetch!(outstanding, bucket)

    case :queue.out(queue) do
      {:empty, _queue} ->
        # Nobody wants to run anything on the bucket. We can hop out.
        :keep_state_and_data

      {{:value, {request, from}}, updated_queue} ->
        # We found a request we can queue. Start it, and then try and see if we
        # can queue another one, repeating the cycle until we have either
        # exhausted the queue of waiting requests or the remaining calls on the
        # endpoint.
        accounted_remaining = remaining - 1

        # The `:run` function will take care of updating the remaining calls in
        # the `outstanding` map.
        outstanding_without_this = Map.put(outstanding, bucket, {remaining, updated_queue})

        run_request = {:next_event, :internal, {:run, request, bucket, from}}
        try_starting_next = {:next_event, :internal, {:next, accounted_remaining, bucket}}

        {:keep_state, %{data | outstanding: outstanding_without_this},
         [run_request, try_starting_next]}
    end
  end

  # Requests were recently paused due to global limit. Unpause as many as we
  # can.
  def connected(:internal, {:unpause_requests, [bucket | buckets]}, %{
        remaining_in_window: remaining,
        outstanding: outstanding
      })
      when remaining > 0 do
    {stored_remaining, _queue} = Map.fetch!(outstanding, bucket)
    Logger.warning("Requeueing request to #{inspect(bucket)} after user limit")
    # Due to the way that `:next` is implemented, if we have a large queue on a
    # single bucket we may quickly exhaust the user calls for this second
    # instantly.
    {:keep_state_and_data,
     [
       {:next_event, :internal, {:next, stored_remaining, bucket}},
       {:next_event, :internal, {:unpause_requests, buckets}}
     ]}
  end

  # No more things to unpause.
  def connected(:internal, {:unpause_requests, []}, _data) do
    :keep_state_and_data
  end

  # Hmm, we'll try again later.
  def connected(:internal, {:unpause_requests, buckets}, %{remaining_in_window: 0}) do
    Logger.warning(
      "Unpaused a few requests since the user ratelimit has reset, but #{length(buckets)} bucket(s) are left in the outstanding queue. If the amount of requests stays above the user ratelimit, a backlog may grow"
    )

    :keep_state_and_data
  end

  # Our user timeout has reset - put it back to Discord's documented value.
  # Case 1: We still had some calls remaining, so no request queues paused
  # themselves.
  def connected(
        {:timeout, @bot_calls_timeout_event},
        :expired,
        %{remaining_in_window: remaining} = data
      )
      when remaining > 0 do
    {:keep_state, %{data | remaining_in_window: @bot_calls_per_window}}
  end

  # Case 2: We did not have any calls remaining. In addition to resetting it,
  # we need to kickstart our requests again.
  def connected(
        {:timeout, @bot_calls_timeout_event},
        :expired,
        %{remaining_in_window: 0, outstanding: outstanding} = data
      ) do
    Logger.debug("Received user call window reset with no remaining requests, unpausing")

    {:keep_state, %{data | remaining_in_window: @bot_calls_per_window},
     {:next_event, :internal, {:unpause_requests, Map.keys(outstanding)}}}
  end

  # The bucket's ratelimit window has expired: we may make calls again. Or, to
  # be more specific, we may make a single call to find out how many calls we
  # will have remaining in the next window. If there are waiting entries, we
  # start scheduling, unless we are out of requests for this time window on all
  # bot requests.
  def connected({:timeout, bucket}, :expired, %{remaining_in_window: 0}) do
    Logger.debug(
      "Ratelimits on #{inspect(bucket)} have expired but we may not queue more requests due to the bot user limit."
    )

    :keep_state_and_data
  end

  def connected(
        {:timeout, bucket},
        :expired,
        %{outstanding: outstanding} = data
      )
      when is_map_key(outstanding, bucket) do
    # "remaining" is mostly worthless here, since the bucket's remaining calls
    # have now reset anyways.
    {_remaining, queue} = Map.fetch!(outstanding, bucket)

    case :queue.out(queue) do
      {:empty, _queue} ->
        # Nobody else has anything to queue, so we're good on cleaning up the bucket.
        {:keep_state, %{data | outstanding: Map.delete(outstanding, bucket)}}

      {{:value, {request, from}}, updated_queue} ->
        # There's more where that came from. Update the stored queue and
        # schedule the request to run instantly. Since this is the initial
        # request to get the new ratelimit, we also set the special marker.
        outstanding_with_this = Map.put(outstanding, bucket, {:initial, updated_queue})
        run_request = {:next_event, :internal, {:run, request, bucket, from}}
        {:keep_state, %{data | outstanding: outstanding_with_this}, [run_request]}
    end
  end

  # Beginning of the response. For responses without a body, this is the
  # complete response. For responses with a body, we set up the buffer here. In
  # either case, we parse the retrieved ratelimiting information here.
  def connected(
        :info,
        {:gun_response, _conn, stream, kind, status, headers},
        %{inflight: inflight, running: running} = data
      ) do
    {bucket, request, from} = Map.fetch!(running, stream)
    response = parse_response(status, headers)
    limits = parse_headers(response)
    parse_limits = {:next_event, :internal, {:parse_limits, limits, bucket}}

    cond do
      kind == :fin and status == 429 ->
        # Uh oh. This better be a user or global ratelimit because our
        # ratelimiter is fast, and not because we've actually exhausted a
        # "standard" bucket.
        running_without_this = Map.delete(running, stream)

        Logger.warning(
          "Requeueing request to #{request.method} #{inspect(request.route)} due to 429"
        )

        {:keep_state, %{data | running: running_without_this},
         [
           # parse_limits will transition to the ratelimit state appropriately
           # for us, and event ordering guarantees that this will be the next
           # event that we deal with, regardless of how many clients are
           # sending requests to us in the meantime. Afterwards, the global limit
           # state will need to deal with the requeue request (most likely by
           # postponing it).
           parse_limits,
           {:next_event, :internal, {:requeue, {request, from}}}
         ]}

      kind == :fin ->
        running_without_this = Map.delete(running, stream)

        {:keep_state, %{data | running: running_without_this},
         [
           {:reply, from, format_response(response)},
           parse_limits
         ]}

      kind == :nofin ->
        inflight_with_this = Map.put(inflight, stream, {status, headers, ""})
        {:keep_state, %{data | inflight: inflight_with_this}, parse_limits}
    end
  end

  def connected(:info, {:gun_data, _conn, stream, :nofin, body}, %{inflight: inflight} = data) do
    inflight_with_buffer =
      Map.update!(
        inflight,
        stream,
        fn {status, headers, buffer} ->
          {status, headers, <<buffer::binary, body::binary>>}
        end
      )

    {:keep_state, %{data | inflight: inflight_with_buffer}}
  end

  def connected(
        :info,
        {:gun_data, _conn, stream, :fin, body},
        %{inflight: inflight, running: running} = data
      ) do
    {{_bucket, request, from}, running_without_this} = Map.pop(running, stream)
    {{status, headers, buffer}, inflight_without_this} = Map.pop(inflight, stream)
    full_buffer = <<buffer::binary, body::binary>>
    unparsed = parse_response(status, headers, full_buffer)
    response = format_response(unparsed)
    new_data = %{data | inflight: inflight_without_this, running: running_without_this}

    case status do
      429 ->
        # Not great - this should ideally be a global or user ratelimit, both
        # things which we don't receive any anticipation about from the API.
        # Give the request a second chance. Note that dealing with entering the
        # global or user ratelimit was already performed by `parse_limits` in
        # an earlier step.
        {:keep_state, new_data,
         [
           {:next_event, :internal, {:requeue, {request, from}}}
         ]}

      _ ->
        {:keep_state, new_data, {:reply, from, response}}
    end
  end

  # Parse limits and deal with them accordingly by scheduling the bucket expiry
  # timeout and scheduling the next requests to run as appropriate.
  def connected(
        :internal,
        {:parse_limits, {:bucket_limit, {remaining, reset_after}}, bucket},
        %{outstanding: outstanding} = data
      )
      when remaining >= 0 do
    expire_bucket = {{:timeout, bucket}, reset_after, :expired}

    case Map.fetch(outstanding, bucket) do
      # This is the first response we got for the absolute initial call.
      # Update the remaining value to the reported value.
      {:ok, {:initial, queue}} ->
        updated_outstanding = Map.put(outstanding, bucket, {remaining, queue})

        {:keep_state, %{data | outstanding: updated_outstanding},
         [
           expire_bucket,
           {:next_event, :internal, {:next, remaining, bucket}}
         ]}

      # We already have some information about the remaining calls saved. In
      # that case, don't touch it - just try to reschedule and `:next` will do
      # the rest.
      # Why not update the `remaining` value? If we update it to the value
      # reported in the response, it may jump up again, but the remaining
      # value, once set, must strictly monotonically decrease. We count the
      # requests we have running down to zero in the state machine, but when
      # the first response from Discord comes in, it may tell us we have four
      # remaining calls while in reality multiple other requests are already in
      # flight and ready to cause their ratelimiter engineers some headaches.
      # Therefore, we must rely on our own value (and on the API to not decide
      # to change the ratelimit halfway through the bucket lifetime).
      {:ok, {stored_remaining, _queue}} ->
        {:keep_state_and_data,
         [
           expire_bucket,
           {:next_event, :internal, {:next, stored_remaining, bucket}}
         ]}

      # There is no more bucket with outstanding requests anymore.
      # This can happen if the ratelimiter reset - emptying the ratelimiter
      # info - occurs before we receive and parse the response here.
      # In that case, it means that there are also no ratelimits
      # on the route anymore, under the assumption that the ratelimiter's
      # internal remaining -> 0 counting worked properly. In that case,
      # we also don't need to reschedule.
      :error ->
        :keep_state_and_data
    end
  end

  def connected(:internal, {:parse_limits, {:user_limit, retry_after}, _bucket}, data) do
    Logger.warning(
      "Hit user limit, transitioning into global limit state for #{retry_after / 1000} seconds"
    )

    {:next_state, :global_limit, data, [{:state_timeout, retry_after, :connected}]}
  end

  def connected(:internal, {:parse_limits, {:global_limit, retry_after}, _bucket}, data) do
    Logger.warning(
      "Hit global limit, transitioning into global limit state for #{retry_after / 1000} seconds"
    )

    {:next_state, :global_limit, data, [{:state_timeout, retry_after, :connected}]}
  end

  # If we did not get any ratelimit headers let's not send further requests to
  # this endpoint until we get another request to send requests to it. This
  # means that the request queue is effectively paused until another response
  # receives ratelimit headers - so individual requests resulting in 500s
  # (which don't send ratelimit headers) will just stop and not cause a train
  # of hurt. New requests that result in positive feedback will then kick the
  # queue again.
  def connected(:internal, {:parse_limits, :congratulations_you_killed_upstream, bucket}, _data) do
    Logger.warning(
      "No ratelimits received on bucket #{bucket}, likely due to a server error. " <>
        "Holding off request queue pipelining until next client request."
    )

    :keep_state_and_data
  end

  # A running request was killed - suboptimal. Log a warning and try again.
  def connected(:info, {:gun_error, _conn, stream, :closed}, %{running: running} = data)
      when is_map_key(running, stream) do
    # Ensure that we do not get further garbage for this stream
    :ok = :gun.flush(stream)

    {{_bucket, request, from}, running_without_it} = Map.pop(running, stream)

    Logger.warning(
      "Request to #{inspect(request.route)} queued by #{request.from} was closed abnormally, requeueing"
    )

    {:keep_state, %{data | running: running_without_it},
     {:next_event, :internal, {:requeue, {request, from}}}}
  end

  def connected(:info, {:gun_down, conn, _, reason, killed_streams}, %{running: running}) do
    # Even with `retry: 0`, gun seems to try and reconnect, potentially because
    # of WebSocket. Force the connection to die.
    :ok = :gun.close(conn)
    :ok = :gun.flush(conn)

    # Streams that we previously received `:gun_error` notifications for have
    # been requeued already, and we won't find them in the `running` list.
    # Respond to any client whose request we won't retry.
    # Note that if other code than the `:gun_error` clause for a closed stream
    # removes the request from the `running` map _and does not requeue it on
    # its own terms_, a client may hang indefinitely.
    replies =
      killed_streams
      |> Stream.map(&Map.get(running, &1))
      |> Stream.reject(&(&1 == nil))
      |> Enum.map(fn stream ->
        {_bucket, _request, client} = Map.fetch!(running, stream)
        {:reply, client, {:error, {:connection_died, reason}}}
      end)

    {:next_state, :disconnected, empty_state(), replies}
  end

  def global_limit(:state_timeout, next, data) do
    {:next_state, next, data}
  end

  # We got a some more data after heading into global limit state. This
  # normally happens when we receive a 429 from a request that is marked
  # `:nofin` (so a body will arrive with it), the internal `parse_limits` event
  # puts us into global limit state, and then the response comes in. Let
  # somebody else deal with it.
  def global_limit(:info, {:gun_response, _conn, _stream, _kind, _status, _headers}, _data) do
    {:keep_state_and_data, :postpone}
  end

  # Same as above.
  def global_limit(:info, {:gun_data, _conn, _stream, _fin_or_nofin, _body}, _data) do
    {:keep_state_and_data, :postpone}
  end

  def global_limit({:call, _from}, {:queue, _request}, _data) do
    {:keep_state_and_data, :postpone}
  end

  # Requeue is sent when a regular request is met with a 429, and this path is
  # always hit at least once after entering the global limit state. Instead of
  # returning an error to the client we postpone it until we can deal with it
  # again.
  def global_limit(:internal, {:requeue, {_request, _from}}, _data) do
    {:keep_state_and_data, :postpone}
  end

  def global_limit({:timeout, _bucket}, :expired, _data) do
    {:keep_state_and_data, :postpone}
  end

  # End of state functions

  def code_change(_version, state, data, _extra) do
    {:ok, state, data}
  end

  # End of callback functions

  defp parse_response(status, headers), do: {:ok, {status, headers, ""}}

  defp parse_response(status, headers, buffer),
    do: {:ok, {status, headers, buffer}}

  @spec empty_state :: state()
  defp empty_state,
    do: %{
      outstanding: %{},
      running: %{},
      inflight: %{},
      conn: nil,
      remaining_in_window: @bot_calls_per_window
    }

  # Helper functions

  @doc """
  Queue the given request and wait for the response synchronously.

  Ratelimits on the endpoint are handled by the ratelimiter. Global ratelimits
  will cause this to return an error.
  """
  def queue(request) do
    :gen_statem.call(@registered_name, {:queue, request})
  end

  @spec value_from_rltuple({String.t(), String.t()}) :: String.t() | nil
  defp value_from_rltuple({_k, v}), do: v

  @spec header_value([{String.t(), String.t()}], String.t(), String.t() | nil) :: String.t() | nil
  defp header_value(headers, key, default \\ nil) do
    headers
    |> List.keyfind(key, 0, {key, default})
    |> value_from_rltuple()
  end

  # defp parse_headers({:error, _reason} = result), do: result

  # credo:disable-for-next-line
  defp parse_headers({:ok, {_status, headers, _body}}) do
    limit_scope = header_value(headers, "x-ratelimit-scope")
    remaining = header_value(headers, "x-ratelimit-remaining")
    remaining = unless is_nil(remaining), do: String.to_integer(remaining)

    reset_after = header_value(headers, "x-ratelimit-reset-after")

    reset_after =
      unless is_nil(reset_after), do: :erlang.trunc(String.to_float(reset_after) * 1000)

    cond do
      is_nil(remaining) and is_nil(reset_after) ->
        :congratulations_you_killed_upstream

      limit_scope == "user" and remaining == 0 ->
        # Per bot or user limit.
        {:user_limit, reset_after}

      limit_scope == "global" and remaining == 0 ->
        # Per bot or user global limit.
        {:global_limit, reset_after}

      !is_nil(remaining) and !is_nil(reset_after) ->
        # Normal bucket limit.
        {:bucket_limit, {remaining, reset_after}}
    end
  end

  @doc """
  Retrieves a proper ratelimit endpoint from a given route and url.
  """
  @spec get_endpoint(String.t(), String.t()) :: String.t()
  def get_endpoint(route, method) do
    endpoint =
      Regex.replace(~r/\/([a-z-]+)\/(?:[0-9]{17,19})/i, route, fn capture, param ->
        case param do
          param when param in @major_parameters ->
            capture

          param ->
            "/#{param}/_id"
        end
      end)
      |> replace_webhook_token()
      |> replace_emojis()

    if String.ends_with?(endpoint, "/messages/_id") and method == :delete do
      "delete:" <> endpoint
    else
      endpoint
    end
  end

  defp format_response(response) do
    case response do
      # {:error, error} ->
      #  {:error, error}

      {:ok, {status, _, body}} when status in [200, 201] ->
        {:ok, body}

      {:ok, {204, _, _}} ->
        {:ok}

      {:ok, {status, _, body}} ->
        response =
          case Jason.decode(body, keys: :atoms) do
            {:ok, parsed} -> parsed
            _error -> body
          end

        {:error, %ApiError{status_code: status, response: response}}
    end
  end

  defp replace_emojis(endpoint) do
    Regex.replace(
      ~r/\/reactions\/[^\/]+\/?(@me|_id)?/i,
      endpoint,
      "/reactions/_emoji/\\g{1}/"
    )
  end

  defp replace_webhook_token(endpoint) do
    Regex.replace(
      ~r/\/webhooks\/([0-9]{17,19})\/[^\/]+\/?/i,
      endpoint,
      "/webhooks/\\g{1}/_token/"
    )
  end
end