Skip to main content

lib/cairnloop/knowledge_base/workers/chunk_revision.ex

defmodule Cairnloop.KnowledgeBase.Workers.ChunkRevision do
  use Oban.Worker, queue: :default, unique: [period: 60]

  alias Cairnloop.KnowledgeAutomation
  alias Cairnloop.KnowledgeBase.{Revision, Chunk, MarkdownParser}
  alias Cairnloop.Embedder.ExternalApi
  import Ecto.Query

  defp repo do
    Application.fetch_env!(:cairnloop, :repo)
  end

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"revision_id" => revision_id}}) do
    :telemetry.execute([:openinference, :span, :start], %{}, %{name: "chunk_revision"})
    _ = knowledge_automation().record_review_task_reindex_started(revision_id)

    result =
      case repo().get(Revision, revision_id) do
        nil ->
          {:error, :revision_not_found}

        %Revision{content: content} ->
          chunk_sections = MarkdownParser.parse_sections(content || "")
          chunk_texts = Enum.map(chunk_sections, & &1.content)

          case ExternalApi.generate_embeddings(chunk_texts) do
            {:ok, embeddings} ->
              now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)

              chunk_records =
                Enum.zip(chunk_sections, embeddings)
                |> Enum.map(fn {section, vector} ->
                  %{
                    revision_id: revision_id,
                    chunk_index: section.chunk_index,
                    heading: section.heading,
                    content: section.content,
                    embedding: Pgvector.new(vector),
                    inserted_at: now,
                    updated_at: now
                  }
                end)

              Ecto.Multi.new()
              |> Ecto.Multi.delete_all(
                :delete_old_chunks,
                from(c in Chunk, where: c.revision_id == ^revision_id)
              )
              |> Ecto.Multi.insert_all(:insert_chunks, Chunk, chunk_records)
              |> repo().transaction()

              :ok

            {:error, reason} ->
              {:error, reason}
          end
      end

    :telemetry.execute([:openinference, :span, :stop], %{}, %{
      name: "chunk_revision",
      status: result
    })

    _ = knowledge_automation().record_review_task_reindex_outcome(revision_id, result)

    result
  end

  defp knowledge_automation do
    Application.get_env(:cairnloop, :knowledge_automation, KnowledgeAutomation)
  end
end