lib/step_flow/models/jobs/jobs.ex

defmodule StepFlow.Jobs do
  @moduledoc """
  The Jobs context.
  """

  require Logger

  import Ecto.Query, warn: false
  alias StepFlow.Repo

  alias StepFlow.Amqp.CommonEmitter
  alias StepFlow.Controllers.BlackList
  alias StepFlow.Controllers.Jobs
  alias StepFlow.Jobs.Job
  alias StepFlow.Jobs.Status
  alias StepFlow.Progressions.Progression
  alias StepFlow.QueryFilter
  alias StepFlow.Workers.WorkerStatuses
  alias StepFlow.Workflows

  @doc """
  Returns the list of jobs.

  ## Examples

      iex> list_jobs()
      [%Job{}, ...]

  """
  def list_jobs(params \\ %{}) do
    query =
      from(job in Job)
      |> apply_rights(params)
      |> apply_default_query_filters(params)

    internal_query_jobs(query, params)
  end

  def internal_list_jobs(params \\ %{}) do
    query =
      from(job in Job)
      |> apply_default_query_filters(params)

    internal_query_jobs(query, params)
  end

  defp internal_query_jobs(query, params) do
    page =
      Map.get(params, "page", 0)
      |> StepFlow.Integer.force()

    size =
      Map.get(params, "size", 10)
      |> StepFlow.Integer.force()

    offset = page * size

    total_query = from(item in query, select: count(item.id))

    total =
      Repo.all(total_query)
      |> List.first()

    query =
      from(
        job in query,
        order_by: [desc: :inserted_at],
        offset: ^offset,
        limit: ^size
      )

    jobs =
      Repo.all(query)
      |> Repo.preload([
        :status,
        :progressions,
        :updates,
        child_workflow: [:jobs, :artifacts, :status]
      ])

    %{
      data: jobs,
      total: total,
      page: page,
      size: size
    }
  end

  def apply_rights(query, params) do
    allowed_workflows = Workflows.check_rights(params)

    case Enum.member?(allowed_workflows, "*") do
      true ->
        query

      false ->
        allowed_job_ids = Repo.all(query) |> check_rights(allowed_workflows)

        from(
          job in query,
          where: job.id in ^allowed_job_ids
        )
    end
  end

  def apply_default_query_filters(query, params) do
    query =
      case StepFlow.Map.get_by_key_or_atom(params, "workflow_id") do
        nil ->
          query

        str_workflow_id ->
          workflow_id = StepFlow.Integer.force(str_workflow_id)
          from(job in query, where: job.workflow_id == ^workflow_id)
      end

    query =
      query
      |> QueryFilter.filter_query(params, :job_type, :name)
      |> QueryFilter.filter_query(params, :step_id)
      |> filter_status(params)
      |> filter_worker_label(params)
      |> filter_worker_version(params)
      |> filter_worker_instance_id(params)

    query =
      case Map.get(params, "direct_messaging_queue_name") do
        nil ->
          query

        direct_messaging_queue_name ->
          direct_messaging_queue_name =
            String.replace(direct_messaging_queue_name, "direct_messaging_", "")

          expected =
            %{
              id: "direct_messaging_queue_name",
              type: "string",
              value: direct_messaging_queue_name
            }
            |> Jason.encode!()

          from(
            job in query,
            where: fragment("? @> array[?::text]::jsonb[]", job.parameters, ^expected)
          )
      end

    query
    |> QueryFilter.apply_end_date_filter(params, :end_date)
    |> QueryFilter.apply_end_date_filter(params, :before_date)
    |> QueryFilter.apply_start_date_filter(params, :start_date)
    |> QueryFilter.apply_start_date_filter(params, :after_date)
  end

  @doc """
  Returns the list of jobs pending"
  """
  def get_pending_jobs_by_type do
    ids_with_progress =
      from(p in Progression,
        select: %{job_id: p.job_id}
      )

    ids_with_status =
      from(s in Status,
        select: %{job_id: s.job_id}
      )

    query =
      from(job in Job,
        left_join: progress in subquery(ids_with_progress),
        on: job.id == progress.job_id,
        left_join: status in subquery(ids_with_status),
        on: job.id == status.job_id,
        where: is_nil(progress.job_id) and is_nil(status.job_id),
        select: %{name: job.name, value: count(job.id)},
        group_by: job.name
      )

    Repo.all(query)
  end

  @doc """
  Returns the list of jobs pending"
  """
  def get_processing_jobs_by_type do
    sub_query_status =
      from(
        status in Status,
        select: %{job_id: status.job_id, inserted_at: max(status.inserted_at)},
        group_by: [status.job_id]
      )

    sub_query =
      from(
        status in Status,
        join: st in subquery(sub_query_status),
        on: status.job_id == st.job_id and status.inserted_at == st.inserted_at,
        where: status.state == :processing
      )

    query =
      from(job in Job,
        right_join: st in subquery(sub_query),
        on: st.job_id == job.id,
        select: %{name: job.name, value: count(job.id)},
        group_by: job.name
      )

    Repo.all(query)
  end

  defp check_rights(jobs, allowed_workflows) do
    Enum.map(
      jobs,
      fn job ->
        if Enum.member?(
             allowed_workflows,
             StepFlow.Map.get_by_key_or_atom(
               Workflows.get_workflow_for_job!(job.id),
               :identifier
             )
           ) do
          job.id
        end
      end
    )
  end

  def filter_status(query, params) do
    QueryFilter.filter_query_with_related_entry(
      query,
      params,
      :states,
      :id,
      Status,
      :state,
      :job_id
    )
  end

  def filter_worker_version(query, params) do
    QueryFilter.filter_query_with_related_entry(
      query,
      params,
      :versions,
      :last_worker_instance_id,
      StepFlow.Workers.WorkerStatus,
      :version,
      :instance_id
    )
  end

  def filter_worker_label(query, params) do
    QueryFilter.filter_query_with_related_entry(
      query,
      params,
      :labels,
      :last_worker_instance_id,
      StepFlow.Workers.WorkerStatus,
      :label,
      :instance_id
    )
  end

  def filter_worker_instance_id(query, params) do
    QueryFilter.filter_query_with_related_entry(
      query,
      params,
      :instance_ids,
      :last_worker_instance_id,
      StepFlow.Workers.WorkerStatus,
      :instance_id,
      :instance_id
    )
  end

  @doc """
  Gets a single job.

  Raises `Ecto.NoResultsError` if the Job does not exist.

  ## Examples

      iex> get_job!(123)
      %Job{}

      iex> get_job!(456)
      ** (Ecto.NoResultsError)

  """
  def get_job!(id), do: Repo.get!(Job, id)

  @doc """
  Gets a single job.

  ## Examples

      iex> get_job(123)
      %Job{}

      iex> get_job(456)
      nil

  """
  def get_job(id), do: Repo.get(Job, id)

  @doc """
  Gets a single job by workflow ID and step ID

  ## Examples

      iex> get_job(123)
      %Job{}

      iex> get_job(456)
      nil

  """
  def get_by!(%{"workflow_id" => workflow_id, "step_id" => step_id}) do
    Repo.get_by!(Job, workflow_id: workflow_id, step_id: step_id)
  end

  @doc """
  Gets a single job by workflow ID and step ID

  ## Examples

      iex> get_job(123)
      %Job{}

      iex> get_job(456)
      nil

  """
  def get_by(%{"workflow_id" => workflow_id, "step_id" => step_id}) do
    Repo.get_by(Job, workflow_id: workflow_id, step_id: step_id)
  end

  @doc """
  Gets a single job with its related status.

  Raises `Ecto.NoResultsError` if the Job does not exist.

  ## Examples

      iex> get_job_with_status!(123)
      %Job{}

      iex> get_job!(456)
      ** (Ecto.NoResultsError)

  """
  def get_job_with_status!(id) do
    get_job!(id)
    |> Repo.preload([
      :status,
      :progressions,
      :updates,
      child_workflow: [:jobs, :artifacts, :status]
    ])
  end

  @doc """
  Creates a job.

  ## Examples

      iex> create_job(%{field: value})
      {:ok, %Job{}}

      iex> create_job(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  def create_job(attrs \\ %{}) do
    %Job{}
    |> Job.changeset(attrs)
    |> Repo.insert()
  end

  def copy_job(%Job{} = job) do
    %{
      name: job.name,
      step_id: job.step_id,
      parameters: job.parameters,
      workflow_id: job.workflow_id,
      is_live: job.is_live,
      is_updatable: job.is_updatable,
      allow_failure: job.allow_failure
    }
    |> create_job()
  end

  @doc """
  Creates a job with a paused status.

  ## Examples

      iex> create_paused_job(workflow, 1, "download_http")
      {:ok, "paused"}

  """
  def create_paused_job(workflow_id, step_id, step_name, post_action) do
    job_params = %{
      name: step_name,
      step_id: step_id,
      workflow_id: workflow_id,
      parameters: []
    }

    {:ok, job} = create_job(job_params)
    Status.set_job_status(job.id, :queued)
    Status.set_job_status(job.id, :processing)
    Status.set_job_status(job.id, :paused, post_action)
  end

  @doc """
  Creates a job with a skipped status.

  ## Examples

      iex> create_skipped_job(workflow, 1, "download_http")
      {:ok, "skipped"}

  """
  def create_skipped_job(workflow, step_id, action) do
    job_params = %{
      name: action,
      step_id: step_id,
      workflow_id: workflow.id,
      parameters: []
    }

    {:ok, job} = create_job(job_params)
    Status.set_job_status(job.id, :skipped)
    {:ok, "skipped"}
  end

  @doc """
  Creates a job with an error status.

  ## Examples

      iex> create_error_job(workflow, step_id, "download_http", "unsupported step")
      {:ok, "created"}

  """
  def create_error_job(workflow, step_id, action, description) do
    job_params = %{
      name: action,
      step_id: step_id,
      workflow_id: workflow.id,
      parameters: []
    }

    {:ok, job} = create_job(job_params)
    Status.set_job_status(job.id, :queued)
    Status.set_job_status(job.id, :processing)
    Status.set_job_status(job.id, :error, %{message: description})
    {:ok, "created"}
  end

  @doc """
  Creates a job with a completed status.

  ## Examples

      iex> create_completed_job(workflow, step_id, "webhook_notification")
      {:ok, "completed"}

  """
  def create_completed_job(workflow, step_id, action) do
    job_params = %{
      name: action,
      step_id: step_id,
      workflow_id: workflow.id,
      parameters: []
    }

    {:ok, job} = create_job(job_params)
    Status.set_job_status(job.id, :queued)
    Status.set_job_status(job.id, :processing)
    Status.set_job_status(job.id, :completed)
    {:ok, "completed"}
  end

  defp abort_job(job) do
    worker_status = WorkerStatuses.get_worker_status_for_job(job.id)

    case worker_status do
      nil ->
        Logger.warn("Cannot abort job #{job.id}, no associated running worker found.")

      worker_status ->
        message =
          Jobs.get_message(job)
          |> Map.put(:type, "stop_process")

        Logger.info(
          "Send stop_process message for step #{job.id} to worker #{worker_status.instance_id}"
        )

        publish_result =
          case CommonEmitter.publish_json(
                 worker_status.direct_messaging_queue_name,
                 job.id,
                 message,
                 "direct_messaging",
                 headers: [{"instance_id", :longstr, worker_status.instance_id}]
               ) do
            :ok ->
              {:ok, "stopped"}

            _ ->
              {:error, "unable to publish message"}
          end

        publish_result
    end
  end

  def abort_jobs(workflow, step_id, action) do
    jobs =
      internal_list_jobs(%{
        name: action,
        step_id: step_id,
        workflow_id: workflow.id,
        # assuming that a workflow cannot have more than 1000 jobs
        size: 1000
      })

      # Create dedicated method
      |> Map.get(:data)

    # Black list queued jobs
    jobs
    |> Enum.filter(fn job ->
      last_status = Jobs.get_last_status(job.status)
      last_status.state == :queued
    end)
    |> Enum.each(fn job ->
      BlackList.add_and_notify!(job.id)
    end)

    # Abort running jobs
    jobs
    |> Enum.filter(fn job ->
      length(job.progressions) > 0
    end)
    |> Enum.each(fn job ->
      abort_job(job)
    end)
  end

  @doc """
  Set skipped status to all queued jobs.

  ## Examples

      iex> skip_jobs(workflow, step_id, "download_http")
      :ok

  """
  def skip_jobs(workflow, step_id, action) do
    internal_list_jobs(%{
      name: action,
      step_id: step_id,
      workflow_id: workflow.id
    })

    # Create dedicated method
    |> Map.get(:data)
    |> Enum.filter(fn job ->
      case job.status do
        [%{state: state}] -> state != "queued"
        _ -> false
      end
    end)
    |> Enum.each(fn job ->
      Status.set_job_status(job.id, :skipped)
    end)
  end

  @doc """
  Updates a job.

  ## Examples

      iex> update_job(job, %{field: new_value})
      {:ok, %Job{}}

      iex> update_job(job, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  def update_job(%Job{} = job, attrs) do
    job
    |> Job.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Deletes a Job.

  ## Examples

      iex> delete_job(job)
      {:ok, %Job{}}

      iex> delete_job(job)
      {:error, %Ecto.Changeset{}}

  """
  def delete_job(%Job{} = job) do
    job
    |> Job.changeset(%{deleted: true})
    |> Repo.update()
  end

  @doc """
  Returns an `%Ecto.Changeset{}` for tracking job changes.

  ## Examples

      iex> change_job(job)
      %Ecto.Changeset{source: %Job{}}

  """
  def change_job(%Job{} = job) do
    Job.changeset(job, %{})
  end
end