lib/sanity/sync.ex

defmodule Sanity.Sync do
  @moduledoc """
  For syncing content from Sanity CMS to Ecto.

  ## Suggested strategy for syncing

  * Call `sync/2` when a webhook is called to immediately create, update, or delete the document.
  * Use `sync_all/1` for the inital import of documents and to reconcile created/updates webhooks
    that were missed.
  * Use `reconcile_deleted/1` to reconcile any deleted webhooks that were missed.
  """

  require Logger
  import Ecto.Query
  alias Sanity.Sync.Doc
  import UnsafeAtomizeKeys

  @callback_opt_schema {
    :callback,
    type: {:fun, 1},
    doc:
      "Callback function that will be called after the document is upserted. It will be passed a map like `%{doc: doc, repo: repo}`. This callback is not called when the record is deleted."
  }

  @request_opts_opt_schema {
    :request_opts,
    type: :keyword_list, required: true, doc: "Options to be passed to `Sanity.request/2`."
  }

  defp repo, do: Application.fetch_env!(:sanity_sync, :repo)

  @doc """
  Gets a single document. Returns `nil` if document does not exist.
  """
  def get_doc(id) do
    case repo().get(Doc, id) do
      nil -> nil
      %Doc{doc: doc} -> unsafe_atomize_keys(doc)
    end
  end

  @doc """
  Gets a single document. Raises `Ecto.NoResultsError` if document does not exist.
  """
  def get_doc!(id) do
    repo().get!(Doc, id).doc |> unsafe_atomize_keys()
  end

  @reconcile_deleted_opts_schema [
    @request_opts_opt_schema,
    batch_size: [
      type: :pos_integer,
      default: 200,
      doc: "Number of records to fetch per batch."
    ]
  ]

  @doc """
  Deletes any `Sanity.Sync.Doc` records in Ecto that correspond with documents that no longer
  exist in Sanity CMS.

  ## Options

  #{NimbleOptions.docs(@reconcile_deleted_opts_schema)}
  """
  def reconcile_deleted(opts) do
    opts = NimbleOptions.validate!(opts, @reconcile_deleted_opts_schema)
    batch_size = opts[:batch_size]

    stream_ecto_ids(batch_size)
    |> Stream.chunk_every(batch_size)
    |> Enum.flat_map(fn ids ->
      existing_ids =
        stream(
          projection: "{ _id }",
          query: "_id in $ids",
          request_opts: opts[:request_opts],
          variables: %{ids: ids}
        )
        |> Enum.map(& &1._id)

      ids -- existing_ids
    end)
    |> case do
      [] ->
        nil

      ids ->
        Logger.warning("deleting #{length(ids)} records: #{inspect(ids, limit: :infinity)}")
        repo().delete_all(from d in Doc, where: d.id in ^ids)
    end
  end

  # Returns a lazy stream of all `Sanity.Sync.Doc` ids in Ecto. Like `Ecto.Repo.stream` but
  # doesn't keep a transaction open while enumerating.
  defp stream_ecto_ids(batch_size) do
    query =
      from d in Doc,
        select: d.id,
        order_by: d.id,
        limit: ^batch_size

    Stream.unfold(:first_page, fn
      nil ->
        nil

      :first_page ->
        ids = repo().all(query)
        {ids, List.last(ids)}

      last_id ->
        ids = repo().all(from d in query, where: d.id > ^last_id)
        {ids, List.last(ids)}
    end)
    |> Stream.flat_map(& &1)
  end

  @sync_opts_schema [
    @callback_opt_schema,
    @request_opts_opt_schema
  ]

  @doc """
  Fetches a single document from Sanity. If the document exists then `upsert_sanity_doc!/2` will
  be called. If the document doesn't exist, then the `Sanity.Sync.Doc` for that document will be
  deleted.

  ## Options

  #{NimbleOptions.docs(@sync_opts_schema)}
  """
  def sync(id, opts) when is_binary(id) do
    opts = NimbleOptions.validate!(opts, @sync_opts_schema)

    stream(
      batch_size: 1,
      query: "_id == $id",
      request_opts: opts[:request_opts],
      variables: %{id: id}
    )
    |> Enum.take(1)
    |> case do
      [doc] ->
        upsert_sanity_doc!(doc, Keyword.take(opts, [:callback]))

      [] ->
        repo().delete_all(from d in Doc, where: d.id == ^id)
    end
  end

  @sync_all_opts_schema [
    @callback_opt_schema,
    @request_opts_opt_schema,
    types: [
      type: {:list, :string},
      required: true,
      doc: "List of types to sync."
    ]
  ]

  @doc """
  Fetches all documents from Sanity and calls `upsert_sanity_doc!/2`.

  ## Options

  #{NimbleOptions.docs(@sync_all_opts_schema)}
  """
  def sync_all(opts) do
    opts = NimbleOptions.validate!(opts, @sync_all_opts_schema)

    stream(
      query: "_type in $types",
      variables: %{types: opts[:types]},
      request_opts: opts[:request_opts]
    )
    |> Stream.each(&upsert_sanity_doc!(&1, Keyword.take(opts, [:callback])))
    |> Stream.run()
  end

  @upsert_sanity_doc_opts_schema [@callback_opt_schema]

  @doc """
  Upserts a sanity document.

  ## Options

  #{NimbleOptions.docs(@upsert_sanity_doc_opts_schema)}
  """
  def upsert_sanity_doc!(%{_id: id, _type: type} = doc, opts \\ []) do
    opts = NimbleOptions.validate!(opts, @upsert_sanity_doc_opts_schema)

    Doc.changeset(%Doc{}, %{doc: doc, id: id, type: type})
    |> repo().insert!(conflict_target: :id, on_conflict: :replace_all)
    |> tap(fn _ ->
      case Keyword.fetch(opts, :callback) do
        {:ok, cb} -> cb.(%{doc: doc, repo: repo()})
        :error -> nil
      end
    end)
  end

  defp stream(opts) do
    Application.get_env(:sanity_sync, :sanity_client, Sanity).stream(opts)
    |> Stream.map(fn doc -> unsafe_atomize_keys(doc, &Inflex.underscore/1) end)
  end
end