defmodule Mosql.ChangeStreamProducer do
use GenStage
require Logger
alias Mosql.ChangeStreamEvent
@behaviour Broadway.Producer
@collection "users"
@supervisor_name Mosql.DynamicSupervisor
@doc """
Invoked once by Broadway during Broadway.start_link/2.
see more here https://hexdocs.pm/broadway/Broadway.Producer.html#c:prepare_for_start/2
@broadway_opts [
hibernate_after: 15000,
context: :context_not_set,
resubscribe_interval: 100,
max_seconds: 5,
max_restarts: 3,
shutdown: 30000,
name: MS.Pipeline,
producer: [
hibernate_after: 15000,
transformer: nil,
concurrency: 1,
module:
{MS.BroadwayMongo.Producer, <producer_opts>}
],
processors: [default: [hibernate_after: 15000, max_demand: 10, concurrency: 1]],
batchers: [
default: [hibernate_after: 15000, batch_timeout: 1000, concurrency: 1, batch_size: 1]
]
]
"""
@impl true
def prepare_for_start(_module, broadway_opts) do
Logger.info("prepare for start with broadway options #{inspect(broadway_opts)}")
{producer_module, mongo_opts} = broadway_opts[:producer][:module]
mongo_opts = Keyword.put(mongo_opts, :name, :mongo_broadway)
broadway_opts_with_defaults =
put_in(
broadway_opts,
[:producer, :module],
{producer_module, [last_resume_token: nil, mongo_opts: mongo_opts]}
)
children = [
{DynamicSupervisor, strategy: :one_for_one, name: @supervisor_name}
]
{children, broadway_opts_with_defaults}
end
@impl true
def init(args) do
Logger.debug("args from broadway pipeline start: #{inspect(args)}")
initial_state = %{
last_resume_token: Keyword.fetch!(args, :last_resume_token),
mongo_opts: Keyword.fetch!(args, :mongo_opts)
}
Logger.info("init mongo db change streamer with state: #{inspect(initial_state)}")
Process.send_after(self(), :connect, 500)
pid = self()
Logger.info("producer process id #{inspect(pid)}")
{:producer, initial_state}
end
@impl true
def handle_info(:connect, state) do
mongo_pid = connect_mongo(state)
Process.monitor(mongo_pid)
state = Map.put(state, :mongo_pid, mongo_pid)
main_pid = self()
# spawn a new process to watch the streaming cursor and monitor it
cursor_pid =
spawn(fn ->
Enum.each(get_stream_cursor(main_pid, state), &new_change_stream_event(main_pid, &1))
end)
Process.monitor(cursor_pid)
state = Map.put(state, :cursor_pid, cursor_pid)
Logger.info("watching change streams in a background process #{inspect(cursor_pid)}")
{:noreply, [], state}
end
@impl true
def handle_info({:DOWN, _, :process, pid, reason}, state) do
Logger.info(":DOWN signal received for pid: #{inspect(pid)}")
if state.mongo_pid == pid do
Logger.info(
":DOWN signal received from mongo process: #{inspect(reason)}. retrying connecting again"
)
else
Logger.info(
":DOWN signal received from mongo cursor process: #{inspect(reason)}. retrying connecting again"
)
end
Process.send_after(self(), :connect, 3000)
{:noreply, [], state}
end
@impl true
def handle_demand(demand, state) do
Logger.info("change streamer received more demand #{demand}")
# ignore since documents received from change stream is sent via handle_cast
{:noreply, [], state}
end
@impl true
def handle_cast({:new_resume_token, token}, state) do
Logger.debug("storing new resume token #{inspect(token)} in the state")
{:noreply, [], %{state | last_resume_token: token}}
end
@impl true
def handle_cast({:new_change_stream_event, raw_change_stream_event}, state) do
message = ChangeStreamEvent.parse_message(raw_change_stream_event)
Logger.info("send new document received to the processor")
{:noreply, [message], state}
end
defp new_token(parent, %{"_data" => token}) do
Logger.debug("new resume token received: #{token}")
GenStage.cast(parent, {:new_resume_token, token})
end
defp new_change_stream_event(parent, raw_event_data) do
Logger.info("new change stream event received")
Logger.debug("#{inspect(raw_event_data)}")
GenStage.cast(parent, {:new_change_stream_event, raw_event_data})
end
defp get_stream_cursor(parent, %{last_resume_token: nil}) do
Logger.info("watch the collection without resume token")
res = Mongo.watch_collection(:mongo_broadway, @collection, [], &new_token(parent, &1),
max_time: 2_000,
full_document: "updateLookup"
)
case res do
{:error, %{code: 40573}} -> Logger.info("BAKWAASSS")
end
end
defp get_stream_cursor(parent, %{last_resume_token: resume_token}) do
Logger.info("watch the collection using last resume token: #{inspect(resume_token)}")
Mongo.watch_collection(:mongo_broadway, @collection, [], &new_token(parent, &1),
full_document: "updateLookup",
max_time: 2_000,
resume_after: %{"_data" => resume_token}
)
end
defp connect_mongo(state) do
case DynamicSupervisor.start_child(@supervisor_name, {Mongo, state.mongo_opts}) do
{:ok, pid} ->
Logger.info("mongo connection process #{inspect(pid)}")
pid
{:error, {:already_started, pid}} ->
Logger.info("using existing mongo connection process #{inspect(pid)}")
pid
end
end
end