lib/loop.ex

defmodule Strom.Loop do
  @moduledoc false

  @timeout 5000
  @sleep 1

  defstruct name: nil,
            last_empty_call_at: nil,
            timeout: @timeout,
            sleep: @sleep

  def new(name, opts \\ []) do
    %__MODULE__{
      name: name,
      timeout: Keyword.get(opts, :timeout, @timeout),
      sleep: Keyword.get(opts, :sleep, @sleep)
    }
  end

  def start(%__MODULE__{name: name} = loop) do
    case Agent.start_link(fn -> [] end, name: name) do
      {:ok, _pid} ->
        loop

      {:error, {:already_started, _pid}} ->
        loop
    end
  end

  # call as a source
  def call(%__MODULE__{name: name} = loop) do
    Agent.get_and_update(name, fn data ->
      case data do
        [hd | tl] -> {hd, tl}
        [] -> {nil, []}
      end
    end)
    |> case do
      nil ->
        case loop.last_empty_call_at do
          nil ->
            Process.sleep(loop.sleep)
            {[], %{loop | last_empty_call_at: System.os_time(:millisecond)}}

          last_empty_call_at ->
            if System.os_time(:millisecond) - last_empty_call_at > loop.timeout do
              {:halt, loop}
            else
              {[], loop}
            end
        end

      datum ->
        {[datum], %{loop | last_empty_call_at: nil}}
    end
  end

  # call as a sink
  def call(%__MODULE__{name: name} = loop, data) do
    :ok = Agent.update(name, fn prev_data -> prev_data ++ [data] end)
    loop
  end

  def stop(%__MODULE__{name: name} = loop) do
    :ok = Agent.stop(name)
    loop
  end
end