lib/crawly/data_storage/data_storage.ex

defmodule Crawly.DataStorage do
  @moduledoc """
  Data Storage, is a module responsible for storing crawled items.
  On the high level it's possible to represent the architecture of items
  storage this way:


                 ┌──────────────────┐
                 │                  │             ┌------------------┐
                 │   DataStorage    <─────────────┤ From crawlers1,2 │
                 │                  │             └------------------┘
                 └─────────┬────────┘
                           │
                           │
                           │
                           │
              ┌────────────▼─────────────────┐
              │                              │
              │                              │
              │                              │
  ┌───────────▼──────────┐       ┌───────────▼──────────┐
  │  DataStorageWorker1  │       │   DataStorageWorker2 │
  │      (Crawler1)      │       │      (Crawler2)      │
  └──────────────────────┘       └──────────────────────┘
  """
  require Logger

  use GenServer

  defstruct workers: %{}, pid_spiders: %{}

  def start_worker(spider_name, crawl_id) do
    GenServer.call(__MODULE__, {:start_worker, spider_name, crawl_id})
  end

  @spec store(atom(), map()) :: :ok
  def store(spider, item) do
    GenServer.call(__MODULE__, {:store, spider, item})
  end

  def stats(spider) do
    GenServer.call(__MODULE__, {:stats, spider})
  end

  @spec inspect(atom(), term()) ::
          {:error, :data_storage_worker_not_running} | term()
  def inspect(spider, field) do
    GenServer.call(__MODULE__, {:inspect, spider, field})
  end

  def start_link([]) do
    Logger.debug("Starting data storage")

    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_args) do
    {:ok, %Crawly.DataStorage{workers: %{}, pid_spiders: %{}}}
  end

  def handle_call({:store, spider, item}, _from, state) do
    %{workers: workers} = state

    message =
      case Map.get(workers, spider) do
        nil ->
          {:error, :data_storage_worker_not_running}

        pid ->
          Crawly.DataStorage.Worker.store(pid, item)
      end

    {:reply, message, state}
  end

  def handle_call({:start_worker, spider_name, crawl_id}, _from, state) do
    {msg, new_state} =
      case Map.get(state.workers, spider_name) do
        nil ->
          pid = do_start_worker(spider_name, crawl_id)

          new_workers = Map.put(state.workers, spider_name, pid)
          new_spider_pids = Map.put(state.pid_spiders, pid, spider_name)

          new_state = %Crawly.DataStorage{
            state
            | workers: new_workers,
              pid_spiders: new_spider_pids
          }

          {{:ok, pid}, new_state}

        _ ->
          {{:error, :already_started}, state}
      end

    {:reply, msg, new_state}
  end

  def handle_call({:stats, spider_name}, _from, state) do
    msg =
      case Map.get(state.workers, spider_name) do
        nil ->
          {:error, :data_storage_worker_not_running}

        pid ->
          Crawly.DataStorage.Worker.stats(pid)
      end

    {:reply, msg, state}
  end

  def handle_call({:inspect, spider_name, field}, _from, state) do
    msg =
      case Map.get(state.workers, spider_name) do
        nil ->
          {:error, :data_storage_worker_not_running}

        pid ->
          Crawly.DataStorage.Worker.inspect(pid, field)
      end

    {:reply, msg, state}
  end

  # Clean up worker
  def handle_info({:DOWN, _ref, :process, pid, _}, state) do
    spider_name = Map.get(state.pid_spiders, pid)
    new_pid_spiders = Map.delete(state.pid_spiders, pid)
    new_workers = Map.delete(state.workers, spider_name)
    new_state = %{state | workers: new_workers, pid_spiders: new_pid_spiders}

    {:noreply, new_state}
  end

  defp do_start_worker(spider_name, crawl_id) do
    {:ok, pid} =
      DynamicSupervisor.start_child(
        Crawly.DataStorage.WorkersSup,
        %{
          id: :undefined,
          restart: :temporary,
          start:
            {Crawly.DataStorage.Worker, :start_link,
             [[spider_name: spider_name, crawl_id: crawl_id]]}
        }
      )

    Process.monitor(pid)
    pid
  end
end