lib/crawly/requests_storage/requests_storage.ex

defmodule Crawly.RequestsStorage do
  @moduledoc """
  Request storage, a module responsible for storing urls for crawling

                 ┌──────────────────┐
                 │                  │             ┌------------------┐
                 │ RequestsStorage  <─────────────┤ From crawlers1,2 │
                 │                  │             └------------------┘
                 └─────────┬────────┘
                           │
                           │
                           │
                           │
              ┌────────────▼─────────────────┐
              │                              │
              │                              │
              │                              │
  ┌───────────▼──────────┐       ┌───────────▼──────────┐
  │RequestsStorageWorker1│       │RequestsStorageWorker2│
  │      (Crawler1)      │       │      (Crawler2)      │
  └──────────────────────┘       └──────────────────────┘

  All requests are going through one RequestsStorage process, which
  quickly finds the actual worker, which finally stores the request
  afterwords.
  """
  require Logger

  use GenServer

  defstruct workers: %{}, pid_spiders: %{}

  alias Crawly.RequestsStorage

  @batch_call_max_count 50

  @doc """
  Store individual request or multiple requests in related child worker
  """
  @spec store(Crawly.spider(), Crawly.Request.t() | [Crawly.Request.t()]) ::
          :ok | {:error, :storage_worker_not_running}
  def store(spider_name, %Crawly.Request{} = request),
    do: GenServer.call(__MODULE__, {:store, {spider_name, [request]}})

  def store(spider_name, requests) when is_list(requests) do
    requests
    |> Stream.chunk_every(@batch_call_max_count)
    |> Enum.each(&GenServer.call(__MODULE__, {:store, {spider_name, &1}}))
  end

  def store(_spider_name, request) do
    Logger.error("#{inspect(request)} does not seem to be a request. Ignoring.")
    {:error, :not_request}
  end

  @doc """
  Pop a request out of requests storage
  """
  @spec pop(Crawly.spider()) ::
          nil | Crawly.Request.t() | {:error, :storage_worker_not_running}
  def pop(spider_name) do
    GenServer.call(__MODULE__, {:pop, spider_name})
  end

  @doc """
  Get statistics from the requests storage
  """
  @spec stats(Crawly.spider()) ::
          {:stored_requests, non_neg_integer()}
          | {:error, :storage_worker_not_running}
  def stats(spider_name) do
    GenServer.call(__MODULE__, {:stats, spider_name})
  end

  @spec requests(atom()) ::
          {:requests, [Crawly.Request.t()]} | {:error, :spider_not_running}
  def requests(spider_name) do
    GenServer.call(__MODULE__, {:requests, spider_name})
  end

  @doc """
  Starts a worker for a given spider
  """
  @spec start_worker(Crawly.spider(), crawl_id :: String.t()) ::
          {:ok, pid()} | {:error, :already_started}
  def start_worker(spider_name, crawl_id) do
    GenServer.call(__MODULE__, {:start_worker, spider_name, crawl_id})
  end

  def start_link([]) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_args) do
    {:ok, %RequestsStorage{}}
  end

  def handle_call({:store, {spider_name, requests}}, _from, state) do
    %{workers: workers} = state

    msg =
      case Map.get(workers, spider_name) do
        nil ->
          {:error, :storage_worker_not_running}

        pid ->
          Crawly.RequestsStorage.Worker.store(pid, requests)
      end

    {:reply, msg, state}
  end

  def handle_call({:pop, spider_name}, _from, state = %{workers: workers}) do
    resp =
      case Map.get(workers, spider_name) do
        nil ->
          {:error, :storage_worker_not_running}

        pid ->
          Crawly.RequestsStorage.Worker.pop(pid)
      end

    {:reply, resp, state}
  end

  def handle_call({:stats, spider_name}, _from, state) do
    msg =
      case Map.get(state.workers, spider_name) do
        nil ->
          {:error, :storage_worker_not_running}

        pid ->
          Crawly.RequestsStorage.Worker.stats(pid)
      end

    {:reply, msg, state}
  end

  def handle_call({:requests, spider_name}, _from, state) do
    msg =
      case Map.get(state.workers, spider_name) do
        nil ->
          {:error, :storage_worker_not_running}

        pid ->
          Crawly.RequestsStorage.Worker.requests(pid)
      end

    {:reply, msg, state}
  end

  def handle_call({:start_worker, spider_name, crawl_id}, _from, state) do
    {msg, new_state} =
      case Map.get(state.workers, spider_name) do
        nil ->
          {:ok, pid} =
            DynamicSupervisor.start_child(
              Crawly.RequestsStorage.WorkersSup,
              %{
                id: :undefined,
                restart: :temporary,
                start:
                  {Crawly.RequestsStorage.Worker, :start_link,
                   [spider_name, crawl_id]}
              }
            )

          Process.monitor(pid)

          new_workers = Map.put(state.workers, spider_name, pid)
          new_spider_pids = Map.put(state.pid_spiders, pid, spider_name)

          new_state = %{
            state
            | workers: new_workers,
              pid_spiders: new_spider_pids
          }

          {{:ok, pid}, new_state}

        _ ->
          {{:error, :already_started}, state.workers}
      end

    {:reply, msg, new_state}
  end

  # Clean up worker
  def handle_info({:DOWN, _ref, :process, pid, _}, state) do
    spider_name = Map.get(state.pid_spiders, pid)
    new_pid_spiders = Map.delete(state.pid_spiders, pid)
    new_workers = Map.delete(state.workers, spider_name)
    new_state = %{state | workers: new_workers, pid_spiders: new_pid_spiders}

    {:noreply, new_state}
  end
end