lib/shopify/graphql/limiter/producer.ex

if Code.ensure_loaded?(GenStage) do
  defmodule Shopify.GraphQL.Limiter.Producer do
    use GenStage

    alias Shopify.GraphQL.{ Config, Helpers, Limiter, Request }

    #
    # client
    #

    @spec idle?(Limiter.name_t()) :: boolean
    def idle?(partition) do
      GenStage.call(name(partition), :idle?)
    end

    @spec name(Limiter.name_t()) :: Limiter.name_t()
    def name(partition) do
      Helpers.Limiter.process_name(partition, Producer)
    end

    @spec send(Limiter.name_t(), Request.t(), Config.t()) :: Shopify.GraphQL.http_response_t()
    def send(partition, request, config) do
      GenStage.call(name(partition), { :send, node(), request, config }, :infinity)
    end

    @spec start_link(Keyword.t()) :: GenServer.on_start()
    def start_link(opts) do
      GenStage.start_link(__MODULE__, opts, hibernate_after: 3_500, name: name(opts[:partition]))
    end

    @spec waiting?(Limiter.name_t()) :: boolean
    def waiting?(partition) do
      GenStage.call(name(partition), :waiting?)
    end

    @spec wait_and_retry(Limiter.name_t(), map, non_neg_integer) :: :ok
    def wait_and_retry(partition, event, wait_for) do
      GenStage.call(name(partition), { :wait_and_retry, event, wait_for })
    end

    #
    # callbacks
    #

    @impl true
    def init(opts) do
      state = Map.new()
      state = Map.put(state, :dead_owners, [])
      state = Map.put(state, :demand, 0)
      state = Map.put(state, :limiter_opts, opts[:limiter_opts])
      state = Map.put(state, :partition, opts[:partition])
      state = Map.put(state, :queue, :queue.new())
      state = Map.put(state, :waiting, false)

      Process.send_after(self(), :sweep_dead_owners, 5_000)

      { :producer, state }
    end

    @impl true
    def handle_call(:idle?, _from, state) do
      { :reply, :queue.is_empty(state[:queue]), [], state }
    end

    @impl true
    def handle_call({ :send, node, request, config }, { pid, _ref } = from, state) do
      Process.monitor(pid)

      event = Map.new()
      event = Map.put(event, :config, config)
      event = Map.put(event, :limiter_opts, state[:limiter_opts])
      event = Map.put(event, :partition, state[:partition])
      event = Map.put(event, :request, request)
      event = Map.put(event, :node, node)
      event = Map.put(event, :owner, from)

      queue = :queue.in(event, state[:queue])

      if state[:waiting] do
        { :noreply, [], %{ state | queue: queue } }
      else
        demand = Map.get(state, :demand)

        { { events, event_len }, queue } = Helpers.Queue.take(queue, demand)

        demand = demand - event_len

        { :noreply, events, %{ state | demand: demand, queue: queue } }
      end
    end

    @impl true
    def handle_call(:waiting?, _from, state) do
      { :reply, state[:waiting], [], state }
    end

    @impl true
    def handle_call({ :wait_and_retry, event, wait_for }, _from, state) do
      state[:timer] && Process.cancel_timer(state[:timer])

      dead_owners = Map.get(state, :dead_owners)

      { pid, _ } = event[:owner]

      alive? = !Enum.any?(dead_owners, fn({ dead_pid, _ }) -> dead_pid == pid end)

      queue = (alive? && :queue.in_r(event, state[:queue])) || state[:queue]

      state = Map.put(state, :queue, queue)
      state = Map.put(state, :timer, Process.send_after(self(), :start, wait_for))
      state = Map.put(state, :waiting, true)

      { :reply, :ok, [], state }
    end

    @impl true
    def handle_demand(demand, state) do
      demand = demand + Map.get(state, :demand, 3)

      if state[:waiting] do
        { :noreply, [], %{ state | demand: demand } }
      else
        { { events, event_len }, queue } = Helpers.Queue.take(state[:queue], demand)

        demand = demand - event_len

        { :noreply, events, %{ state | demand: demand, queue: queue } }
      end
    end

    @impl true
    def handle_info({ :DOWN, _ref, :process, pid, _reason }, state) do
      dead_owners = Map.get(state, :dead_owners)
      dead_owners = dead_owners ++ [{ pid, DateTime.utc_now() }]

      queue = Map.get(state, :queue)
      queue = :queue.delete_with(fn(%{ owner: { owner, _ } }) -> owner == pid end, queue)

      { :noreply, [], %{ state | dead_owners: dead_owners, queue: queue } }
    end

    @impl true
    def handle_info(:start, state) do
      demand = Map.get(state, :demand)

      { { events, event_len }, queue } = Helpers.Queue.take(state[:queue], demand)

      demand = demand - event_len

      state = Map.put(state, :demand, demand)
      state = Map.put(state, :queue, queue)
      state = Map.put(state, :timer, nil)
      state = Map.put(state, :waiting, false)

      { :noreply, events, state }
    end

    def handle_info(:sweep_dead_owners, state) do
      now = DateTime.utc_now()

      dead_owners = Map.get(state, :dead_owners)
      dead_owners = Enum.drop_while(dead_owners, fn({ _, time_of_death }) -> DateTime.diff(now, time_of_death) >= 45 end)

      Process.send_after(self(), :sweep_dead_owners, 5_000)

      { :noreply, [], %{ state | dead_owners: dead_owners } }
    end
  end
end