lib/history/lending_rates/create_chunks_broadway.ex

defmodule History.LendingRates.CreateChunksBroadway do
  use Broadway
  require Logger
  alias Broadway.Message

  alias History.{
    DataAdapter,
    RangeJob,
    LendingRateHistoryChunks,
    LendingRateHistoryJobs,
    LendingRates
  }

  @spec start_link(term) :: Supervisor.on_start()
  def start_link(_) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {LendingRates.CreateChunksProducer, []},
        transformer: {__MODULE__, :transform, []}
      ],
      processors: [
        default: [concurrency: 2]
      ]
    )
  end

  @impl true
  def handle_message(_, message, _) do
    message
    |> Message.update_data(&process_data/1)
  end

  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  def ack(:ack_id, successful, failed) do
    successful
    |> Enum.each(fn m ->
      {:ok, _} = LendingRateHistoryJobs.update(m.data, %{status: "working"})
      LendingRates.PubSub.broadcast_update(m.data.id, "working")
    end)

    failed
    |> Enum.each(fn m ->
      Logger.error("could not create chunks for job: #{m.data.id}")
    end)

    :ok
  end

  defp process_data(job) do
    job |> each_chunk(&LendingRateHistoryChunks.insert/1)
    job
  end

  def each_chunk(job, callback) do
    {:ok, start_at} = RangeJob.from(job)
    {:ok, end_at} = RangeJob.to(job)

    job.tokens
    |> Enum.map(fn t -> {t.venue, t.symbol} end)
    |> History.Tokens.by_venue_and_symbol()
    |> Enum.each(fn %{venue: venue, symbol: symbol} ->
      with {:ok, lending_rate_adapter} = DataAdapter.for_venue(venue, :lending_rates),
           {:ok, period} <- lending_rate_adapter.period(),
           {:ok, periods_per_chunk} <- lending_rate_adapter.periods_per_chunk() do
        build_each_chunk(
          job,
          venue,
          symbol,
          start_at,
          end_at,
          period,
          periods_per_chunk,
          callback
        )
      end
    end)
  end

  def build_each_chunk(
        job,
        venue,
        token,
        start_at,
        end_at,
        period,
        periods_per_chunk,
        callback
      ) do
    if Timex.before?(start_at, end_at) do
      chunk_end_at = DateTime.add(start_at, period * periods_per_chunk, :second)
      min_chunk_end_at = Tai.DateTime.min(chunk_end_at, end_at)

      chunk = %LendingRates.LendingRateHistoryChunk{
        status: "enqueued",
        job: job,
        venue: venue,
        token: token,
        start_at: start_at,
        end_at: min_chunk_end_at
      }

      callback.(chunk)

      build_each_chunk(
        job,
        venue,
        token,
        min_chunk_end_at,
        end_at,
        period,
        periods_per_chunk,
        callback
      )
    end
  end
end