lib/history/candle_history_jobs.ex

defmodule History.CandleHistoryJobs do
  require Ecto.Query
  require Indifferent
  import Ecto.Query
  alias History.Repo
  alias History.Candles.CandleHistoryJob, as: Job

  @spec get!(Job.id()) :: Job.t()
  def get!(id) do
    Repo.get!(Job, id)
  end

  @default_latest_page 1
  @default_latest_page_size 25
  @max_page_size 100

  def latest(opts) do
    page = max((opts[:page] || @default_latest_page) - 1, 0)
    page_size = min(opts[:page_size] || @default_latest_page_size, @max_page_size)
    offset = page * page_size

    from(
      Job,
      order_by: [desc: :inserted_at],
      offset: ^offset,
      limit: ^page_size
    )
    |> Repo.all()
  end

  @spec count :: non_neg_integer
  def count do
    Repo.aggregate(Job, :count)
  end

  def enqueued_after(id, count) do
    from(
      j in Job,
      where: j.id > ^id and j.status == "enqueued",
      order_by: [asc: :id],
      limit: ^count
    )
    |> Repo.all()
  end

  def job_changeset_today(params) do
    now = DateTime.utc_now()
    {:ok, date_now} = Date.new(now.year, now.month, now.day)
    {:ok, time_now} = Time.new(now.hour, 0, 0)
    {:ok, current_hour} = DateTime.new(date_now, time_now)
    from = current_hour |> Timex.shift(days: -1)
    to = current_hour |> Timex.shift(hours: 1)

    merged_params =
      Map.merge(
        params,
        %{
          from_date: from,
          from_time: from,
          to_date: to,
          to_time: to
        }
      )

    Job.changeset(%Job{}, merged_params)
  end

  def insert(params) do
    changeset = Job.changeset(%Job{}, params)
    Repo.insert(changeset)
  end

  def update(job, params) do
    changeset = Job.changeset(job, params)
    Repo.update(changeset)
  end

  def cancel(job) do
    changeset = Job.changeset(job, %{status: :canceled})
    Repo.update(changeset)
  end

  def broadcast(job, pub_sub \\ Tai.PubSub) do
    msg = %{id: job.id, status: job.status}

    [
      "candle_history_job:#{job.id}",
      "candle_history_job:*"
    ]
    |> Enum.each(fn topic ->
      Phoenix.PubSub.broadcast(pub_sub, topic, {topic, msg})
    end)
  end

  def subscribe_by_id(id, pub_sub \\ Tai.PubSub) do
    topic = "candle_history_job:#{id}"
    Phoenix.PubSub.subscribe(pub_sub, topic)
  end
end