defmodule Immich.Sync do
@moduledoc """
Callback-driven sync pipeline for streaming, processing, and acknowledgements.
This module does not persist events directly. It requests events from the
configured event stream module, processes them chunk-by-chunk through an event
processor module, and acknowledges processed chunks in the same order.
"""
alias Immich.API.Session
defmodule EventStream do
@moduledoc "Behaviour for fetching sync events and acknowledging processed chunks."
alias Immich.API.Session
@type t :: module()
@typedoc "Options passed through to `c:sync_stream/3` and `c:sync_ack/3`."
@type opts :: keyword()
@doc "Returns a stream of raw event payload maps for the requested event types."
@callback sync_stream(Session.t(), [String.t()], opts()) ::
{:ok, Enumerable.t(map())} | {:error, term()}
@doc "Acknowledges event `ack` tokens after successful processing."
@callback sync_ack(Session.t(), [String.t()], opts()) ::
{:ok, map() | term()} | {:error, term()}
end
defmodule EventProcessor do
@moduledoc "Behaviour for processing sync event chunks."
@type t :: module()
@typedoc "Options passed through to `c:process_events/2`."
@type opts :: keyword()
@doc "Processes a homogeneous chunk of `%Immich.Sync.Event{}` values."
@callback process_events([Immich.Sync.Event.t()], opts()) ::
:ok | {:error, term()}
end
defmodule Event do
@moduledoc """
Represents a single sync event emitted by the sync stream.
"""
@enforce_keys [:type, :data, :ack]
defstruct [:type, :data, :ack]
@typedoc "A validated sync event parsed from a stream payload."
@type t :: %__MODULE__{
type: String.t(),
data: map(),
ack: String.t()
}
@doc "Builds an `%Immich.Sync.Event{}` from a raw API payload map."
@spec from_map(map()) :: {:ok, t()} | {:error, :invalid_event_payload}
def from_map(%{"type" => type, "data" => data, "ack" => ack})
when is_binary(type) and is_map(data) and is_binary(ack) do
{:ok, %__MODULE__{type: type, data: data, ack: ack}}
end
def from_map(_payload), do: {:error, :invalid_event_payload}
end
@typedoc "Options passed through to sync and processing callbacks."
@type run_opts :: [
batch_size: pos_integer(),
event_stream_opts: EventStream.opts(),
event_processor_opts: EventProcessor.opts()
]
@doc """
Runs one sync processing session with separate modules.
Events are consumed from `stream_handler_module.sync_stream/3`, converted to
`%Immich.Sync.Event{}`, batched, chunked by contiguous event type runs, then
processed by `event_processor_module.process_events/2` and acknowledged through
`stream_handler_module.sync_ack/3` per chunk in order.
"""
@spec run(Session.t(), [String.t()], EventStream.t(), EventProcessor.t(), run_opts()) ::
{:ok, %{String.t() => non_neg_integer()}} | {:error, term()}
def run(
%Session{} = session,
event_types,
stream_handler_module,
event_processor_module,
opts \\ []
) do
event_stream_opts = Keyword.get(opts, :event_stream_opts, [])
with :ok <- validate_run_inputs(stream_handler_module, event_processor_module),
{:ok, payload_stream} <-
stream_handler_module.sync_stream(session, event_types, event_stream_opts) do
process_stream(
payload_stream,
session,
stream_handler_module,
event_processor_module,
opts
)
end
end
defp validate_run_inputs(stream_handler_module, event_processor_module) do
cond do
not valid_stream_handler_module?(stream_handler_module) ->
{:error, {:invalid_input, :stream_handler_module}}
not valid_event_processor_module?(event_processor_module) ->
{:error, {:invalid_input, :event_processor_module}}
true ->
:ok
end
end
defp valid_stream_handler_module?(handler_module) do
Code.ensure_loaded?(handler_module) and
function_exported?(handler_module, :sync_stream, 3) and
function_exported?(handler_module, :sync_ack, 3)
end
defp valid_event_processor_module?(handler_module) do
Code.ensure_loaded?(handler_module) and
function_exported?(handler_module, :process_events, 2)
end
defp process_stream(
payload_stream,
session,
stream_handler_module,
event_processor_module,
opts
) do
batch_size = Keyword.get(opts, :batch_size, 100)
payload_stream
|> Stream.flat_map(&to_event/1)
|> Stream.chunk_every(batch_size)
|> Stream.flat_map(&contiguous_type_chunks/1)
|> Stream.map(fn chunk ->
process_and_ack_chunk(chunk, session, stream_handler_module, event_processor_module, opts)
end)
|> Enum.reduce_while({:ok, %{}}, fn
{:ok, type, count}, {:ok, totals} ->
{:cont, {:ok, Map.update(totals, type, count, &(&1 + count))}}
{:error, reason}, {:ok, _totals} ->
{:halt, {:error, reason}}
end)
end
defp process_and_ack_chunk(
chunk,
session,
stream_handler_module,
event_processor_module,
opts
) do
type = chunk_type(chunk)
event_processor_opts = Keyword.get(opts, :event_processor_opts, [])
events_stream_options = Keyword.get(opts, :event_stream_opts, [])
with :ok <- event_processor_module.process_events(chunk, event_processor_opts),
{:ok, _response} <-
stream_handler_module.sync_ack(session, chunk_acks(chunk), events_stream_options) do
{:ok, type, length(chunk)}
end
end
defp chunk_type([%Event{type: type} | _]), do: type
defp to_event(payload) do
case Event.from_map(payload) do
{:ok, event} -> [event]
{:error, _reason} -> []
end
end
defp chunk_acks(chunk), do: Enum.map(chunk, & &1.ack)
defp contiguous_type_chunks([]), do: []
# Build chunks with list prepends (O(1)) and reverse to restore order:
# - `current` is prepended while collecting contiguous same-type events,
# then reversed when the chunk is closed.
# - `chunks` is also prepended as chunks are completed, then reversed at end.
# This avoids repeated O(n) appends while preserving input ordering.
defp contiguous_type_chunks([first | rest]) do
{chunks_reversed, current_chunk} =
Enum.reduce(rest, {[], [first]}, fn event, {chunks, current} ->
current_type = hd(current).type
if event.type == current_type do
{chunks, [event | current]}
else
{[Enum.reverse(current) | chunks], [event]}
end
end)
chunks_reversed
|> then(&[Enum.reverse(current_chunk) | &1])
|> Enum.reverse()
end
end