lib/baz/collection_imports/jobs/pull_collection.ex

defmodule Baz.CollectionImports.Jobs.PullCollection do
  @moduledoc """
  Fetch collection data from a venue and normalize before passing through the sink pipeline
  """

  use Oban.Worker, queue: :pull, tags: ["baz", "venue_collection"]
  import Baz.FormatLogger

  defmodule Input do
    defstruct ~w[import sinks]a
  end

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"id" => id}}) do
    collection_import = Baz.CollectionImports.get_collection_import!(id)
    collection_sinks = Baz.NormalizedSinks.get_collection_normalized_sinks()

    %Input{
      import: collection_import,
      sinks: collection_sinks
    }
    |> start_import()
    |> fetch_and_upsert()
    |> complete_import()
  rescue
    e ->
      "unhandled error pulling collection error=~s, stacktrace=~s"
      |> log_error([inspect(e), inspect(__STACKTRACE__)])
  end

  defp start_import(input) do
    "start execution of collection import id=~s" |> log_info([input.import.id])
    {:ok, collection_import} = update_import_status(input.import, "executing")
    %{input | import: collection_import}
  end

  defp fetch_and_upsert(input) do
    case fetch(input.import) do
      %Ecto.Changeset{} = changeset ->
        result = input.sinks |> Enum.map(fn s -> s.receive_collection_import(changeset) end)
        {input, {:ok, result}}

      {:error, reason} = error ->
        "could not fetch collection from venue=~s, slug=~s, reason=~s"
        |> log_error([input.import.venue, input.import.slug, inspect(reason)])

        {input, error}
    end
  end

  defp complete_import({input, fetch_and_upsert_result}) do
    "complete execution of collection import id=~s" |> log_info([input.import.id])
    {:ok, _collection_import} = update_import_status(input.import, "completed")
    fetch_and_upsert_result
  end

  defp update_import_status(collection_import, status) do
    Baz.CollectionImports.update_collection_import(collection_import, %{status: status})
  end

  defp fetch(collection_import) do
    try do
      venue = Baz.Venues.get_venue!(collection_import.venue)

      Baz.VenueAdapter.fetch_collection_by_slug(
        venue,
        collection_import.slug
      )
    rescue
      reason -> {:error, reason}
    end
  end
end