lib/crawly/requests_storage/requests_storage_worker.ex

defmodule Crawly.RequestsStorage.Worker do
  @moduledoc """
  Requests Storage, is a module responsible for storing requests for a given
  spider.

  Automatically filters out already seen requests (uses `fingerprints` approach
  to detect already visited pages).

  Pipes all requests through a list of middlewares, which do pre-processing of
  all requests before storing them
  """
  require Logger

  use GenServer

  defstruct requests: [], count: 0, spider_name: nil, crawl_id: nil

  alias Crawly.RequestsStorage.Worker

  @doc """
  Store individual request or multiple requests
  """
  @spec store(Crawly.spider(), Crawly.Request.t() | [Crawly.Request.t()]) :: :ok
  def store(pid, %Crawly.Request{} = request), do: store(pid, [request])

  def store(pid, requests) when is_list(requests) do
    do_call(pid, {:store, requests})
  end

  @doc """
  Pop a request out of requests storage
  """
  @spec pop(pid()) :: Crawly.Request.t() | nil
  def pop(pid) do
    do_call(pid, :pop)
  end

  @doc """
  Get statistics from the requests storage
  """
  @spec stats(pid()) :: {:stored_requests, non_neg_integer()}
  def stats(pid) do
    do_call(pid, :stats)
  end

  @doc """
  Returns all scheduled requests (used for some sort of preview)
  """
  @spec requests(pid()) :: {:requests, [Crawly.Request.t()]}
  def requests(pid), do: do_call(pid, :requests)

  def start_link(spider_name, crawl_id) do
    GenServer.start_link(__MODULE__, [spider_name, crawl_id])
  end

  def init([spider_name, crawl_id]) do
    Logger.metadata(spider_name: spider_name, crawl_id: crawl_id)

    Logger.debug(
      "Starting requests storage worker for #{inspect(spider_name)}..."
    )

    {:ok, %Worker{requests: [], spider_name: spider_name, crawl_id: crawl_id}}
  end

  # Store the given requests
  def handle_call({:store, requests}, _from, state) do
    new_state = Enum.reduce(requests, state, &pipe_request/2)
    {:reply, :ok, new_state}
  end

  # Get current request from the storage
  def handle_call(:pop, _from, state) do
    %Worker{requests: requests, count: cnt} = state

    {request, rest, new_cnt} =
      case requests do
        [] -> {nil, [], 0}
        [request] -> {request, [], 0}
        [request | rest] -> {request, rest, cnt - 1}
      end

    {:reply, request, %Worker{state | requests: rest, count: new_cnt}}
  end

  def handle_call(:stats, _from, state) do
    {:reply, {:stored_requests, state.count}, state}
  end

  def handle_call(:requests, _from, state) do
    {:reply, {:requests, state.requests}, state}
  end

  defp do_call(pid, command) do
    GenServer.call(pid, command)
  catch
    error, reason ->
      Logger.error(Exception.format(error, reason, __STACKTRACE__))
  end

  defp pipe_request(request, state) do
    case Crawly.Utils.pipe(request.middlewares, request, state) do
      {false, new_state} ->
        new_state

      {new_request, new_state} ->
        # Process request here....
        %{
          new_state
          | count: state.count + 1,
            requests: [new_request | state.requests]
        }
    end
  end
end