defmodule Crawly.DataStorage.Worker do
@moduledoc """
A worker process which stores items for individual spiders. All items
are pre-processed by item_pipelines.
All pipelines are using the state of this process for their internal needs
(persistancy).
The DataStorage.Worker will not write anything to a filesystem. Instead it
would expect that pipelines are going to do that work.
"""
alias Crawly.DataStorage.Worker
require Logger
use GenServer
defstruct stored_items: 0, spider_name: nil, crawl_id: nil
def start_link(spider_name: spider_name, crawl_id: crawl_id) do
GenServer.start_link(__MODULE__,
spider_name: spider_name,
crawl_id: crawl_id
)
end
@spec stats(pid()) :: {:stored_items, non_neg_integer()}
def stats(pid), do: GenServer.call(pid, :stats)
@spec store(pid(), map()) :: :ok
def store(pid, item) do
GenServer.cast(pid, {:store, item})
end
@doc """
Inspect the inner state of the given data worker
"""
@spec inspect(pid, atom()) :: term()
def inspect(pid, field) do
GenServer.call(pid, {:inspect, field})
end
def init(spider_name: spider_name, crawl_id: crawl_id) do
Logger.metadata(spider_name: spider_name, crawl_id: crawl_id)
{:ok, %Worker{spider_name: spider_name, crawl_id: crawl_id}}
end
def handle_cast({:store, item}, state) do
pipelines = Crawly.Utils.get_settings(:pipelines, state.spider_name, [])
state =
case Crawly.Utils.pipe(pipelines, item, state) do
{false, new_state} ->
new_state
{new_item, new_state} ->
Logger.debug("Stored item: #{inspect(new_item)}")
%Worker{new_state | stored_items: state.stored_items + 1}
end
{:noreply, state}
end
def handle_call({:inspect, field}, _from, state) do
msg = {:inspect, Map.get(state, field, nil)}
{:reply, msg, state}
end
def handle_call(:stats, _from, state) do
{:reply, {:stored_items, state.stored_items}, state}
end
end