lib/crawly/manager.ex

defmodule Crawly.Manager do
  @moduledoc """
  Crawler manager module

  This module is responsible for spawning all processes related to
  a given Crawler.

  The manager spawns the following processes tree.
  ┌────────────────┐        ┌───────────────────┐
  │ Crawly.Manager ├────────> Crawly.ManagerSup │
  └────────────────┘        └─────────┬─────────┘
           │                          |
           │                          |
           ┌──────────────────────────┤
           │                          │
           │                          │
  ┌────────▼───────┐        ┌─────────▼───────┐
  │    Worker1     │        │    Worker2      │
  └────────┬───────┘        └────────┬────────┘
           │                         │
           │                         │
           │                         │
           │                         │
  ┌────────▼─────────┐    ┌──────────▼───────────┐
  │Crawly.DataStorage│    │Crawly.RequestsStorage│
  └──────────────────┘    └──────────────────────┘
  """
  require Logger

  @timeout 60_000
  @start_request_split_size 50

  use GenServer

  alias Crawly.{Engine, Utils}

  @spec add_workers(module(), non_neg_integer()) ::
          :ok | {:error, :spider_non_exist}
  def add_workers(spider_name, num_of_workers) do
    case Engine.get_manager(spider_name) do
      {:error, reason} ->
        {:error, reason}

      pid ->
        GenServer.cast(pid, {:add_workers, num_of_workers})
    end
  end

  def start_link([spider_name, options]) do
    Logger.debug("Starting the manager for #{inspect(spider_name)}")
    GenServer.start_link(__MODULE__, [spider_name, options])
  end

  @impl true
  def init([spider_name, options]) do
    crawl_id = Keyword.get(options, :crawl_id)
    Logger.metadata(spider_name: spider_name, crawl_id: crawl_id)

    itemcount_limit =
      Keyword.get(
        options,
        :closespider_itemcount,
        get_default_limit(:closespider_itemcount, spider_name)
      )

    closespider_timeout_limit =
      Keyword.get(
        options,
        :closespider_timeout,
        get_default_limit(:closespider_timeout, spider_name)
      )

    # Start DataStorage worker
    {:ok, data_storage_pid} =
      Crawly.DataStorage.start_worker(spider_name, crawl_id)

    Process.link(data_storage_pid)

    # Start RequestsWorker for a given spider
    {:ok, request_storage_pid} =
      Crawly.RequestsStorage.start_worker(spider_name, crawl_id)

    Process.link(request_storage_pid)

    # Start workers
    num_workers =
      Keyword.get(
        options,
        :concurrent_requests_per_domain,
        Utils.get_settings(:concurrent_requests_per_domain, spider_name, 4)
      )

    worker_pids =
      Enum.map(1..num_workers, fn _x ->
        DynamicSupervisor.start_child(
          spider_name,
          {Crawly.Worker, [spider_name: spider_name, crawl_id: crawl_id]}
        )
      end)

    # Schedule basic service operations for given spider manager
    timeout =
      Utils.get_settings(:manager_operations_timeout, spider_name, @timeout)

    tref = Process.send_after(self(), :operations, timeout)

    Logger.debug(
      "Started #{Enum.count(worker_pids)} workers for #{inspect(spider_name)}"
    )

    {:ok,
     %{
       name: spider_name,
       crawl_id: crawl_id,
       itemcount_limit: itemcount_limit,
       closespider_timeout_limit: closespider_timeout_limit,
       tref: tref,
       prev_scraped_cnt: 0,
       workers: worker_pids
     }, {:continue, {:startup, options}}}
  end

  @impl true
  def handle_continue({:startup, options}, state) do
    # Add start requests to the requests storage
    init = state.name.init(options)

    start_requests_from_req = Keyword.get(init, :start_requests, [])

    start_requests_from_urls =
      init
      |> Keyword.get(:start_urls, [])
      |> Crawly.Utils.requests_from_urls()

    start_requests = start_requests_from_req ++ start_requests_from_urls

    # Split start requests, so it's possible to initialize a part of them in async
    # manner
    {start_reqs, async_start_reqs} =
      Enum.split(start_requests, @start_request_split_size)

    :ok = Crawly.RequestsStorage.store(state.name, start_reqs)

    Task.start(fn ->
      Crawly.RequestsStorage.store(state.name, async_start_reqs)
    end)

    {:noreply, state}
  end

  @impl true
  def handle_cast({:add_workers, num_of_workers}, state) do
    Logger.info("Adding #{num_of_workers} workers for #{inspect(state.name)}")

    Enum.each(1..num_of_workers, fn _ ->
      DynamicSupervisor.start_child(
        state.name,
        {Crawly.Worker, [spider_name: state.name, crawl_id: state.crawl_id]}
      )
    end)

    {:noreply, state}
  end

  @impl true
  def handle_info(:operations, state) do
    Process.cancel_timer(state.tref)

    # Close spider if required items count was reached.
    {:stored_items, items_count} = Crawly.DataStorage.stats(state.name)

    {:stored_requests, requests_count} =
      Crawly.RequestsStorage.stats(state.name)

    delta = items_count - state.prev_scraped_cnt

    Logger.info("Current crawl speed is: #{delta} items/min")
    Logger.info("Current requests count is: #{requests_count}")

    stop_spider_by_requests_count_and_delta(
      state.name,
      requests_count,
      delta
    )

    maybe_stop_spider_by_itemcount_limit(
      state.name,
      items_count,
      state.itemcount_limit
    )

    # Close spider in case if it's not scraping items fast enough
    maybe_stop_spider_by_timeout(
      state.name,
      delta,
      state.closespider_timeout_limit
    )

    tref =
      Process.send_after(
        self(),
        :operations,
        Utils.get_settings(:manager_operations_timeout, state.name, @timeout)
      )

    {:noreply, %{state | tref: tref, prev_scraped_cnt: items_count}}
  end

  defp stop_spider_by_requests_count_and_delta(
         spider_name,
         requests_count,
         delta
       )
       when requests_count == 0 and delta == 0 do
    Logger.info("Stopping #{inspect(spider_name)}, all requests handled")

    Crawly.Engine.stop_spider(spider_name, :spider_finished)
  end

  defp stop_spider_by_requests_count_and_delta(_, _, _), do: :ok

  defp maybe_stop_spider_by_itemcount_limit(
         spider_name,
         current,
         limit
       )
       when current >= limit do
    Logger.info(
      "Stopping #{inspect(spider_name)}, closespider_itemcount achieved"
    )

    Crawly.Engine.stop_spider(spider_name, :itemcount_limit)
  end

  defp maybe_stop_spider_by_itemcount_limit(_, _, _), do: :ok

  defp maybe_stop_spider_by_timeout(spider_name, current, limit)
       when current <= limit and is_integer(limit) do
    Logger.info("Stopping #{inspect(spider_name)}, itemcount timeout achieved")

    Crawly.Engine.stop_spider(spider_name, :itemcount_timeout)
  end

  defp maybe_stop_spider_by_timeout(_, _, _), do: :ok

  defp maybe_convert_to_integer(value) when is_atom(value), do: value

  defp maybe_convert_to_integer(value) when is_binary(value),
    do: String.to_integer(value)

  defp maybe_convert_to_integer(value) when is_integer(value), do: value

  # Get a closespider_itemcount or closespider_timeout_limit from config or spider
  # settings.
  defp get_default_limit(limit_name, spider_name) do
    limit_name
    |> Utils.get_settings(spider_name)
    |> maybe_convert_to_integer()
  end
end