lib/mosql/full_export_producer.ex

defmodule Mosql.FullExportProducer do
  use GenStage

  alias Mosql.Schema

  require Logger

  @doc """
  Trigger the full export process. This starts the Broadway producer
  """
  def trigger(ns) do
    Logger.info("Triggered full export for namespace #{ns}")
    producer() |> GenStage.cast({:trigger, ns})
  end

  def info_producer_status(ns) do
    producer() |> GenStage.cast({:status_info, ns})
  end

  @impl true
  def init(state) do
    Logger.info("Producer init with opts: #{inspect(state)}\n")
    {:producer, state}
  end

  @impl true
  def handle_cast({:trigger, ns}, state) do
    if state.export_triggered do
      Logger.info("Export already triggered, no action performed")
      {:noreply, [], state}
    else
      handle_export_triggred(ns, state)
    end
  end

  @impl true
  def handle_cast({:status_info, ns}, state) do
    log_state(state)
    Logger.info("Status logged for full export for namespace #{ns}")
    {:noreply, [], state}
  end

  @impl true
  def handle_demand(demand, state) do
    Logger.info("Demand received for #{demand} collections")

    cond do
      state.export_triggered && Enum.count(state.collections) == 0 ->
        Logger.info("No collections pending for the requested demand")
        {:noreply, [], state}

      state.export_triggered && Enum.count(state.collections) > 0 ->
        {demanded, remaining} = Enum.split(state.collections, demand - 1)
        state = %{state | collections: remaining}

        demand_filled = state[:demand_filled] + demand
        state = %{state | demand_filled: demand_filled}

        exported_collections = state.exported_collections ++ demanded
        state = %{state | exported_collections: exported_collections}

        log_state(state)

        final_list = collection_data(state.namespace, demanded)
        {:noreply, final_list, state}

      state.export_triggered == false ->
        state = %{state | pending_demand: demand}

        log_state(state)
        {:noreply, [], state}

      true ->
        log_state(state)
        {:noreply, [], state}
    end
  end

  defp handle_export_triggred(ns, state) do
    collections = Schema.all_collections(ns)
    if collections == nil, do: raise("no collections loaded")

    Logger.info("Total collections to export #{inspect(Enum.count(collections))}")

    state = %{state | export_triggered: true}
    state = %{state | namespace: ns}

    if Enum.count(collections) > 0 && state.pending_demand > 0 do
      Logger.info("Handling a pending demand of #{state.pending_demand}")
      {demanded, remaining} = Enum.split(collections, state.pending_demand - 1)

      state = %{state | collections: remaining}
      state = %{state | demand_filled: state.pending_demand}
      state = %{state | pending_demand: 0}

      exported_collections = state.exported_collections ++ demanded
      state = %{state | exported_collections: exported_collections}

      log_state(state)

      final_list = collection_data(state.namespace, demanded)
      {:noreply, final_list, state}
    else
      state = %{state | collections: collections}

      log_state(state)

      final_list = collection_data(state.namespace, collections)
      {:noreply, final_list, state}
    end
  end

  defp producer() do
    Broadway.producer_names(Mosql.FullExport) |> Enum.random()
  end

  defp log_state(state) do
    Logger.info("Full export producer state: #{inspect(state)}")
  end

  defp collection_data(ns, collections) do
    Enum.map(collections, fn c ->
      [namespace: ns, collection: c]
    end)
  end
end