lib/history/predicted_funding_rates/create_chunks_broadway.ex

defmodule History.PredictedFundingRates.CreateChunksBroadway do
  use Broadway
  require Logger
  alias Broadway.Message
  alias History.{PredictedFundingRates, PredictedFundingRateJobs, PredictedFundingRateChunks}

  @spec start_link(term) :: Supervisor.on_start()
  def start_link(_) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {PredictedFundingRates.CreateChunksProducer, []},
        transformer: {__MODULE__, :transform, []}
      ],
      processors: [
        default: [concurrency: 2]
      ]
    )
  end

  @impl true
  def handle_message(_, message, _) do
    message
    |> Message.update_data(&process_data/1)
  end

  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  def ack(:ack_id, successful, failed) do
    successful
    |> Enum.each(fn m ->
      {:ok, _} = PredictedFundingRateJobs.update(m.data, %{status: "working"})
      PredictedFundingRates.PubSub.broadcast_update(m.data.id, "working")
    end)

    failed
    |> Enum.each(fn m ->
      Logger.error("could not create chunks for job: #{m.data.id}")
    end)

    :ok
  end

  defp process_data(job) do
    job |> each_chunk(&PredictedFundingRateChunks.insert/1)
    job
  end

  defp each_chunk(job, callback) do
    job.products
    |> Enum.map(fn p -> {p.venue, p.symbol, :swap} end)
    |> History.Products.by_venue_and_symbol_and_type()
    |> Enum.each(fn p ->
      build_each_chunk(job, p.venue, p.symbol, callback)
    end)
  end

  def build_each_chunk(job, venue, product_symbol, callback) do
    chunk = %PredictedFundingRates.PredictedFundingRateChunk{
      status: "enqueued",
      job: job,
      venue: venue,
      product: product_symbol
    }

    callback.(chunk)
  end
end