lib/immich/sync.ex

defmodule Immich.Sync do
  @moduledoc """
  Callback-driven sync pipeline for streaming, processing, and acknowledgements.

  This module does not persist events directly. It requests events from the
  configured event stream module, processes them chunk-by-chunk through an event
  processor module, and acknowledges processed chunks in the same order.
  """

  alias Immich.API.Session

  defmodule EventStream do
    @moduledoc "Behaviour for fetching sync events and acknowledging processed chunks."

    alias Immich.API.Session

    @type t :: module()

    @typedoc "Options passed through to  `c:sync_stream/3` and `c:sync_ack/3`."
    @type opts :: keyword()

    @doc "Returns a stream of raw event payload maps for the requested event types."
    @callback sync_stream(Session.t(), [String.t()], opts()) ::
                {:ok, Enumerable.t(map())} | {:error, term()}

    @doc "Acknowledges event `ack` tokens after successful processing."
    @callback sync_ack(Session.t(), [String.t()], opts()) ::
                {:ok, map() | term()} | {:error, term()}
  end

  defmodule EventProcessor do
    @moduledoc "Behaviour for processing sync event chunks."

    @type t :: module()

    @typedoc "Options passed through to `c:process_events/2`."
    @type opts :: keyword()

    @doc "Processes a homogeneous chunk of `%Immich.Sync.Event{}` values."
    @callback process_events([Immich.Sync.Event.t()], opts()) ::
                :ok | {:error, term()}
  end

  defmodule Event do
    @moduledoc """
    Represents a single sync event emitted by the sync stream.
    """

    @enforce_keys [:type, :data, :ack]
    defstruct [:type, :data, :ack]

    @typedoc "A validated sync event parsed from a stream payload."
    @type t :: %__MODULE__{
            type: String.t(),
            data: map(),
            ack: String.t()
          }

    @doc "Builds an `%Immich.Sync.Event{}` from a raw API payload map."
    @spec from_map(map()) :: {:ok, t()} | {:error, :invalid_event_payload}
    def from_map(%{"type" => type, "data" => data, "ack" => ack})
        when is_binary(type) and is_map(data) and is_binary(ack) do
      {:ok, %__MODULE__{type: type, data: data, ack: ack}}
    end

    def from_map(_payload), do: {:error, :invalid_event_payload}
  end

  @typedoc "Options passed through to sync and processing callbacks."
  @type run_opts :: [
          batch_size: pos_integer(),
          event_stream_opts: EventStream.opts(),
          event_processor_opts: EventProcessor.opts()
        ]

  @doc """
  Runs one sync processing session with separate modules.

  Events are consumed from `stream_handler_module.sync_stream/3`, converted to
  `%Immich.Sync.Event{}`, batched, chunked by contiguous event type runs, then
  processed by `event_processor_module.process_events/2` and acknowledged through
  `stream_handler_module.sync_ack/3` per chunk in order.
  """
  @spec run(Session.t(), [String.t()], EventStream.t(), EventProcessor.t(), run_opts()) ::
          {:ok, %{String.t() => non_neg_integer()}} | {:error, term()}
  def run(
        %Session{} = session,
        event_types,
        stream_handler_module,
        event_processor_module,
        opts \\ []
      ) do
    event_stream_opts = Keyword.get(opts, :event_stream_opts, [])

    with :ok <- validate_run_inputs(stream_handler_module, event_processor_module),
         {:ok, payload_stream} <-
           stream_handler_module.sync_stream(session, event_types, event_stream_opts) do
      process_stream(
        payload_stream,
        session,
        stream_handler_module,
        event_processor_module,
        opts
      )
    end
  end

  defp validate_run_inputs(stream_handler_module, event_processor_module) do
    cond do
      not valid_stream_handler_module?(stream_handler_module) ->
        {:error, {:invalid_input, :stream_handler_module}}

      not valid_event_processor_module?(event_processor_module) ->
        {:error, {:invalid_input, :event_processor_module}}

      true ->
        :ok
    end
  end

  defp valid_stream_handler_module?(handler_module) do
    Code.ensure_loaded?(handler_module) and
      function_exported?(handler_module, :sync_stream, 3) and
      function_exported?(handler_module, :sync_ack, 3)
  end

  defp valid_event_processor_module?(handler_module) do
    Code.ensure_loaded?(handler_module) and
      function_exported?(handler_module, :process_events, 2)
  end

  defp process_stream(
         payload_stream,
         session,
         stream_handler_module,
         event_processor_module,
         opts
       ) do
    batch_size = Keyword.get(opts, :batch_size, 100)

    payload_stream
    |> Stream.flat_map(&to_event/1)
    |> Stream.chunk_every(batch_size)
    |> Stream.flat_map(&contiguous_type_chunks/1)
    |> Stream.map(fn chunk ->
      process_and_ack_chunk(chunk, session, stream_handler_module, event_processor_module, opts)
    end)
    |> Enum.reduce_while({:ok, %{}}, fn
      {:ok, type, count}, {:ok, totals} ->
        {:cont, {:ok, Map.update(totals, type, count, &(&1 + count))}}

      {:error, reason}, {:ok, _totals} ->
        {:halt, {:error, reason}}
    end)
  end

  defp process_and_ack_chunk(
         chunk,
         session,
         stream_handler_module,
         event_processor_module,
         opts
       ) do
    type = chunk_type(chunk)
    event_processor_opts = Keyword.get(opts, :event_processor_opts, [])
    events_stream_options = Keyword.get(opts, :event_stream_opts, [])

    with :ok <- event_processor_module.process_events(chunk, event_processor_opts),
         {:ok, _response} <-
           stream_handler_module.sync_ack(session, chunk_acks(chunk), events_stream_options) do
      {:ok, type, length(chunk)}
    end
  end

  defp chunk_type([%Event{type: type} | _]), do: type

  defp to_event(payload) do
    case Event.from_map(payload) do
      {:ok, event} -> [event]
      {:error, _reason} -> []
    end
  end

  defp chunk_acks(chunk), do: Enum.map(chunk, & &1.ack)

  defp contiguous_type_chunks([]), do: []

  # Build chunks with list prepends (O(1)) and reverse to restore order:
  # - `current` is prepended while collecting contiguous same-type events,
  #   then reversed when the chunk is closed.
  # - `chunks` is also prepended as chunks are completed, then reversed at end.
  # This avoids repeated O(n) appends while preserving input ordering.
  defp contiguous_type_chunks([first | rest]) do
    {chunks_reversed, current_chunk} =
      Enum.reduce(rest, {[], [first]}, fn event, {chunks, current} ->
        current_type = hd(current).type

        if event.type == current_type do
          {chunks, [event | current]}
        else
          {[Enum.reverse(current) | chunks], [event]}
        end
      end)

    chunks_reversed
    |> then(&[Enum.reverse(current_chunk) | &1])
    |> Enum.reverse()
  end
end