defmodule Strom.Source do
@moduledoc """
Produces stream of events.
## Example with Enumerable
iex> alias Strom.Source
iex> source = :numbers |> Source.new([1, 2, 3]) |> Source.start()
iex> %{numbers: stream} = Source.call(%{}, source)
iex> Enum.to_list(stream)
[1, 2, 3]
## Example with file
iex> alias Strom.{Source, Source.ReadLines}
iex> source = :numbers |> Source.new(ReadLines.new("test/data/numbers1.txt")) |> Source.start()
iex> %{numbers: stream} = Source.call(%{}, source)
iex> Enum.to_list(stream)
["1", "2", "3", "4", "5"]
## If two sources are applied to one stream, the streams will be concatenated (Stream.concat/2)
iex> alias Strom.{Source, Source.ReadLines}
iex> source1 = :numbers |> Source.new([1, 2, 3]) |> Source.start()
iex> source2 = :numbers |> Source.new(ReadLines.new("test/data/numbers1.txt")) |> Source.start()
iex> %{numbers: stream} = %{} |> Source.call(source1) |> Source.call(source2)
iex> Enum.to_list(stream)
[1, 2, 3, "1", "2", "3", "4", "5"]
Source defines a `@behaviour`. One can easily implement their own sources.
See `Strom.Source.ReadLines`, `Strom.Source.Events`, `Strom.Source.IOGets`
"""
@callback start(map) :: map
@callback call(map) :: {:ok, {[term], map}} | {:error, {:halt, map}}
@callback stop(map) :: map
@callback infinite?(map) :: true | false
use GenServer
@buffer 1000
defstruct origin: nil,
name: nil,
pid: nil,
data: [],
opts: [],
task: nil,
waiting_client: nil,
buffer: @buffer
@type t() :: %__MODULE__{}
@type event() :: any()
@spec new(Strom.stream_name(), struct() | [event()], list()) :: __MODULE__.t()
def new(name, origin, opts \\ [])
when is_struct(origin) or (is_list(origin) and is_list(opts)) do
%__MODULE__{origin: origin, name: name, opts: opts}
end
@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{origin: list, opts: opts} = source) when is_list(list) do
start(%{
source
| origin: Strom.Source.Events.new(list),
buffer: Keyword.get(opts, :buffer, @buffer)
})
end
def start(%__MODULE__{origin: origin, opts: opts} = source) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
source = %{
source
| origin: origin,
buffer: Keyword.get(opts, :buffer, @buffer)
}
{:ok, pid} = start_link(source)
__state__(pid)
end
def start_link(%__MODULE__{} = source) do
GenServer.start_link(__MODULE__, source)
end
@impl true
def init(%__MODULE__{} = source), do: {:ok, %{source | pid: self()}}
@spec call(__MODULE__.t()) :: event()
def call(%__MODULE__{pid: pid}), do: GenServer.call(pid, :call, :infinity)
def infinite?(%__MODULE__{pid: pid}), do: GenServer.call(pid, :infinite)
@spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
def call(flow, %__MODULE__{name: name} = source) when is_map(flow) do
:ok = GenServer.call(source.pid, :run_input)
stream =
Stream.resource(
fn ->
nil
end,
fn nil ->
case GenServer.call(source.pid, :get_data, :infinity) do
{:data, data} ->
{data, nil}
:done ->
{:halt, nil}
:pause ->
receive do
:continue_client ->
{[], nil}
end
end
end,
fn nil -> nil end
)
prev_stream = Map.get(flow, name, [])
Map.put(flow, name, Stream.concat(prev_stream, stream))
end
@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{origin: origin, pid: pid}) do
apply(origin.__struct__, :stop, [origin])
GenServer.call(pid, :stop)
end
def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)
defp async_run_input(source) do
Task.Supervisor.async_nolink(Strom.TaskSupervisor, fn ->
loop_call(source)
end)
end
defp loop_call(source) do
case call_source(source) do
{:halt, _source} ->
:task_done
{events, source} ->
GenServer.cast(source.pid, {:new_data, events})
receive do
:continue_task ->
flush(:continue_task)
end
loop_call(source)
end
end
defp call_source(%__MODULE__{origin: origin} = source) do
case apply(origin.__struct__, :call, [origin]) do
{:ok, {events, origin}} ->
source = %{source | origin: origin}
{events, source}
{:error, {:halt, origin}} ->
source = %{source | origin: origin}
case apply(origin.__struct__, :infinite?, [origin]) do
true -> {[], source}
false -> {:halt, source}
end
end
end
@impl true
def handle_call(:run_input, _from, %__MODULE__{} = source) do
task = async_run_input(source)
{:reply, :ok, %{source | task: task}}
end
@impl true
def handle_call(:call, _from, %__MODULE__{} = source) do
{:reply, call_source(source), source}
end
def handle_call(:stop, _from, %__MODULE__{origin: origin} = source) do
origin = apply(origin.__struct__, :stop, [origin])
source = %{source | origin: origin}
{:stop, :normal, :ok, source}
end
def handle_call(:__state__, _from, source), do: {:reply, source, source}
def handle_call(:get_data, {pid, _ref}, source) do
if source.task do
send(source.task.pid, :continue_task)
end
data = source.data
cond do
length(data) == 0 and is_nil(source.task) ->
{:reply, :done, source}
length(data) == 0 ->
{:reply, :pause, %{source | waiting_client: pid}}
true ->
{:reply, {:data, data}, %{source | data: []}}
end
end
@impl true
def handle_cast({:new_data, chunk}, %__MODULE__{} = source) do
all_data = source.data ++ chunk
if source.waiting_client do
send(source.waiting_client, :continue_client)
end
if length(all_data) < source.buffer do
send(source.task.pid, :continue_task)
end
{:noreply, %{source | data: all_data, waiting_client: nil}}
end
@impl true
def handle_info({_task_ref, :task_done}, source) do
if source.waiting_client do
send(source.waiting_client, :continue_client)
end
{:noreply, %{source | task: nil, waiting_client: nil}}
end
def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, source) do
# do nothing for now
{:noreply, source}
end
def handle_info({:DOWN, _task_ref, :process, _task_pid, _not_normal}, source) do
task = async_run_input(source)
{:noreply, %{source | task: task}}
end
defp flush(message) do
receive do
^message ->
flush(message)
after
0 -> :ok
end
end
end