defmodule Strom.GenMix.Consumer do
@moduledoc """
Consumer is used by the generic GenMix component.
"""
use GenServer
defstruct pid: nil,
mix_pid: nil,
running: false,
client: nil,
name: nil,
fun: nil,
data: []
def start({name, fun}, mix_pid, opts \\ []) when is_list(opts) do
state = %__MODULE__{mix_pid: mix_pid, name: name, fun: fun, running: true}
{:ok, pid} = GenServer.start_link(__MODULE__, state)
__state__(pid)
end
@impl true
def init(%__MODULE__{} = cons) do
{:ok, %{cons | pid: self()}}
end
def call(cons) do
Stream.resource(
fn ->
GenServer.call(cons.pid, :register_client)
end,
fn cons ->
case GenServer.call(cons.pid, :get_data, :infinity) do
{:ok, data} ->
if length(data) == 0 do
receive do
:continue ->
flush()
end
end
{data, cons}
{:error, :done} ->
{:halt, cons}
end
end,
fn cons -> cons end
)
end
def stop(cons) do
GenServer.call(cons.pid, :stop)
end
defp flush do
receive do
_ -> flush()
after
0 -> :ok
end
end
def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)
@impl true
def handle_call(:get_data, _from, cons) do
if length(cons.data) == 0 and !cons.running do
{:reply, {:error, :done}, cons}
else
data = cons.data
cons = %{cons | data: []}
GenServer.cast(cons.mix_pid, {:consumer_got_data, {cons.name, cons.fun}})
{:reply, {:ok, data}, cons}
end
end
def handle_call(:register_client, {pid, _ref}, cons) do
cons = %{cons | client: pid}
{:reply, cons, cons}
end
def handle_call(:__state__, _from, cons), do: {:reply, cons, cons}
@impl true
def handle_cast({:put_data, new_data}, cons) do
{new_data, _} = Enum.split_with(new_data, cons.fun)
cons = %{cons | data: cons.data ++ new_data}
{:noreply, cons}
end
def handle_cast(:continue, cons) do
if cons.client do
send(cons.client, :continue)
end
{:noreply, cons}
end
def handle_cast(:stop, cons) do
cons = %{cons | running: false}
{:noreply, cons}
end
end