lib/step_flow/workflows/workflows.ex

defmodule StepFlow.Workflows do
  @moduledoc """
  The Workflows context.
  """

  import Ecto.Query, warn: false

  alias StepFlow.Artifacts.Artifact
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.Progressions.Progression
  alias StepFlow.QueryFilter
  alias StepFlow.Repo
  alias StepFlow.Roles
  alias StepFlow.Workflows
  alias StepFlow.Workflows.Workflow

  require Logger

  @doc """
  Returns the list of workflows.

  ## Examples

      iex> list_workflows()
      [%Workflow{}, ...]

  """
  def list_workflows(params \\ %{}) do
    page =
      Map.get(params, "page", 0)
      |> StepFlow.Integer.force()

    size =
      Map.get(params, "size", 10)
      |> StepFlow.Integer.force()

    offset = page * size

    query =
      from(workflow in Workflow)
      |> apply_default_query_filters(params)

    total_query = from(item in subquery(query), select: count(item.id))

    total =
      Repo.all(total_query)
      |> List.first()

    query =
      from(
        workflow in subquery(query),
        order_by: [desc: :inserted_at],
        offset: ^offset,
        limit: ^size
      )

    workflows =
      Repo.all(query)
      |> Repo.preload([:jobs, :artifacts, :status])
      |> preload_workflows

    %{
      data: workflows,
      total: total,
      page: page,
      size: size
    }
  end

  def apply_default_query_filters(query, params \\ %{}) do
    query =
      from(workflow in subquery(query))
      |> search_for_reference(params, :search)
      |> QueryFilter.filter_query(params, :reference)
      |> QueryFilter.filter_query(params, :identifier)
      |> QueryFilter.filter_query(params, :version_major)
      |> QueryFilter.filter_query(params, :version_minor)
      |> QueryFilter.filter_query(params, :version_micro)
      |> QueryFilter.filter_query(params, :is_live)
      |> QueryFilter.filter_query(params, :deleted)
      |> QueryFilter.filter_query(params, :user_uuid)
      |> filter_mode(params)
      |> QueryFilter.apply_before_date_filter(params, :before_date)
      |> QueryFilter.apply_after_date_filter(params, :after_date)
      |> filter_status(params, :states)

    allowed_workflows = check_rights(params)

    query =
      case {StepFlow.Map.get_by_key_or_atom(params, :workflow_ids),
            Enum.member?(allowed_workflows, "*")} do
        {nil, true} ->
          query

        {nil, false} ->
          from(
            workflow in query,
            where: workflow.identifier in ^allowed_workflows
          )

        {workflow_ids, true} ->
          from(
            workflow in query,
            where: workflow.identifier in ^workflow_ids
          )

        {workflow_ids, false} ->
          intersect =
            workflow_ids
            |> Enum.filter(fn element -> element in allowed_workflows end)

          from(
            workflow in query,
            where: workflow.identifier in ^intersect
          )
      end

    query =
      case StepFlow.Map.get_by_key_or_atom(params, :ids) do
        nil ->
          query

        identifiers ->
          from(workflow in query, where: workflow.id in ^identifiers)
      end

    query
  end

  defp search_for_reference(query, params, field) do
    case StepFlow.Map.get_by_key_or_atom(params, field) do
      nil ->
        query

      search ->
        like = "%#{search}%"

        from(
          workflow in subquery(query),
          where: ilike(workflow.reference, ^like)
        )
    end
  end

  def check_rights(params) do
    roles =
      StepFlow.Map.get_by_key_or_atom(params, :roles, [])
      |> Roles.get_roles()

    view_rights =
      case roles do
        nil ->
          []

        _ ->
          Roles.get_rights_for_entity_type_and_action(roles, "workflow", "view")
      end

    for %{entity: entity} <- view_rights,
        do:
          String.split(entity, "::")
          |> List.last()
  end

  defp filter_mode(query, params) do
    case Map.get(params, "mode") do
      nil ->
        from(workflow in query)

      ["live", "file"] ->
        from(workflow in query)

      ["live"] ->
        from(
          workflow in query,
          where: workflow.is_live == true
        )

      ["file"] ->
        from(
          workflow in query,
          where: workflow.is_live == false
        )
    end
  end

  def filter_status(query, params, key) do
    case StepFlow.Map.get_by_key_or_atom(params, key) do
      nil ->
        query

      states ->
        from(
          workflow in query,
          join:
            workflow_status in subquery(
              from(
                workflow_status in Workflows.Status,
                order_by: [desc: workflow_status.id, desc: workflow_status.workflow_id],
                distinct: [desc: workflow_status.workflow_id]
              )
            ),
          on: workflow.id == workflow_status.workflow_id,
          where: workflow_status.state in ^states
        )
    end
  end

  @doc """
  Gets a single workflows.

  Raises `Ecto.NoResultsError` if the Workflow does not exist.

  ## Examples

      iex> get_workflows!(123)
      %Workflow{}

      iex> get_workflows!(456)
      ** (Ecto.NoResultsError)

  """
  def get_workflow!(id) do
    Repo.get!(Workflow, id)
    |> Repo.preload([:jobs, :artifacts])
    |> preload_workflow
  end

  @doc """
  Gets a single workflows containing the specified job ID.

  Raises `Ecto.NoResultsError` if the Workflow does not exist.

  ## Examples

      iex> get_workflow_for_job!(19)
      %Workflow{}

      iex> get_workflows!(456)
      ** (Ecto.NoResultsError)

  """
  def get_workflow_for_job!(job_id) do
    job = Jobs.get_job!(job_id)
    get_workflow!(job.workflow_id)
  end

  defp preload_workflow(workflow) do
    jobs = Repo.preload(workflow.jobs, [:status, :progressions])

    steps =
      workflow
      |> Map.get(:steps)
      |> get_steps_with_status(jobs)

    workflow
    |> Map.put(:steps, steps)
    |> Map.put(:jobs, jobs)
  end

  def preload_workflows(workflows, result \\ [])
  def preload_workflows([], result), do: result

  def preload_workflows([workflow | workflows], result) do
    result = List.insert_at(result, -1, workflow |> preload_workflow)
    preload_workflows(workflows, result)
  end

  def get_steps_with_status(steps, workflow_jobs, result \\ [])
  def get_steps_with_status([], _workflow_jobs, result), do: result
  def get_steps_with_status(nil, _workflow_jobs, result), do: result

  def get_steps_with_status([step | steps], workflow_jobs, result) do
    name = StepFlow.Map.get_by_key_or_atom(step, :name)
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)

    jobs =
      workflow_jobs
      |> Enum.filter(fn job -> job.name == name && job.step_id == step_id end)

    job_status = count_step_jobs_status(jobs)

    status = get_step_status_from_job_status(job_status)

    step =
      step
      |> Map.put(:status, status)
      |> Map.put(:jobs, job_status)

    result = List.insert_at(result, -1, step)
    get_steps_with_status(steps, workflow_jobs, result)
  end

  defp count_step_jobs_status(jobs) do
    completed = count_status(jobs, :completed)
    errors = count_status(jobs, :error)
    skipped = count_status(jobs, :skipped)
    stopped = count_status(jobs, :stopped)
    processing = count_status(jobs, :processing)
    paused = count_status(jobs, :paused)
    queued = count_status(jobs, :queued)

    %{
      total: length(jobs),
      completed: completed,
      errors: errors,
      paused: paused,
      processing: processing,
      queued: queued,
      skipped: skipped,
      stopped: stopped
    }
  end

  defp get_step_status_from_job_status(job_status) do
    cond do
      job_status.errors > 0 -> :error
      job_status.processing > 0 -> :processing
      job_status.queued > 0 -> :processing
      job_status.stopped > 0 -> :stopped
      job_status.skipped > 0 -> :skipped
      job_status.completed > 0 -> :completed
      job_status.paused > 0 -> :paused
      true -> :pending
    end
  end

  def get_step_definition(job) do
    job = Repo.preload(job, workflow: [:jobs])

    step =
      Enum.filter(job.workflow.steps, fn step ->
        Map.get(step, "id") == job.step_id
      end)
      |> List.first()

    %{step: step, workflow: job.workflow}
  end

  defp count_status(jobs, status, count \\ 0)
  defp count_status([], _status, count), do: count

  defp count_status([job | jobs], status, count) do
    count_completed =
      job.status
      |> Enum.filter(fn s -> s.state == :completed end)
      |> length

    # A job with at least one status.state at :completed is considered :completed
    count =
      if count_completed >= 1 do
        if status == :completed do
          count + 1
        else
          count
        end
      else
        count_uncompleted_status(status, job, count)
      end

    count_status(jobs, status, count)
  end

  defp count_uncompleted_status(status, job, count) do
    case status do
      :processing ->
        count_processing(job, count)

      :error ->
        count_error(job, count)

      :paused ->
        count_paused(job, count)

      :skipped ->
        count_skipped(job, count)

      :stopped ->
        count_stopped(job, count)

      :queued ->
        count_queued(job, count)

      :completed ->
        count

      _ ->
        raise RuntimeError
        Logger.error("unreachable")
        count
    end
  end

  defp count_processing(job, count) do
    if job.progressions == [] do
      count
    else
      last_progression =
        job.progressions
        |> Progression.get_last_progression()

      last_status =
        job.status
        |> Status.get_last_status()

      cond do
        last_status == nil ->
          count + 1

        NaiveDateTime.compare(last_progression.updated_at, last_status.updated_at) == :gt ->
          count + 1

        NaiveDateTime.compare(last_progression.updated_at, last_status.updated_at) == :eq and
            last_status.state == :processing ->
          count + 1

        true ->
          count
      end
    end
  end

  defp count_error(job, count) do
    case Status.get_last_status(job.status) do
      nil -> count
      last_status when last_status.state == :error -> count + 1
      _last_status -> count
    end
  end

  defp count_paused(job, count) do
    case Status.get_last_status(job.status) do
      nil -> count
      last_status when last_status.state == :paused -> count + 1
      _last_status -> count
    end
  end

  defp count_skipped(job, count) do
    case Status.get_last_status(job.status) do
      nil -> count
      last_status when last_status.state == :skipped -> count + 1
      _last_status -> count
    end
  end

  defp count_stopped(job, count) do
    case Status.get_last_status(job.status) do
      nil -> count
      last_status when last_status.state == :stopped -> count + 1
      _last_status -> count
    end
  end

  defp count_queued(job, count) do
    last_status = Status.get_last_status(job.status)
    last_progression = Progression.get_last_progression(job.progressions)

    case {last_status, last_progression} do
      {nil, nil} ->
        count + 1

      {nil, _} ->
        count

      {last_status, nil} when last_status.state == :queued ->
        count + 1

      {last_status, nil} when last_status.state == :retrying ->
        count + 1

      {last_status, _} when last_status.state == :retrying ->
        if NaiveDateTime.compare(last_progression.updated_at, last_status.updated_at) == :gt do
          count
        else
          count + 1
        end

      {_last_status, _} ->
        count
    end
  end

  @doc """
  Creates a workflow.

  ## Examples

      iex> create_workflow(%{field: value})
      {:ok, %Workflow{}}

      iex> create_workflow(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  def create_workflow(attrs \\ %{}) do
    %Workflow{}
    |> Workflow.changeset(attrs)
    |> Repo.insert()
  end

  @doc """
  Updates a workflow.

  ## Examples

      iex> update_workflow(workflow, %{field: new_value})
      {:ok, %Workflow{}}

      iex> update_workflow(workflow, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  def update_workflow(%Workflow{} = workflow, attrs) do
    workflow
    |> Workflow.changeset(attrs)
    |> Repo.update()
  end

  def notification_slack_from_job(job, channel, exposed_domain_name, description) do
    send(
      :step_flow_slack_bot,
      {:message,
       "Error for job #{job.name} ##{job.id} <#{exposed_domain_name}/workflows/#{job.workflow_id} |Open Workflow>\n```#{description}```",
       channel}
    )
  end

  def notification_teams_from_job(job, url, workflow, exposed_domain_name, description) do
    headers = [{"content-type", "application/json"}]

    body =
      Poison.encode!(%{
        "@context": "https://schema.org/extensions",
        "@type": "MessageCard",
        potentialAction: [
          %{
            "@type": "OpenUri",
            name: "View Workflow",
            targets: [
              %{
                os: "default",
                uri: "#{exposed_domain_name}/workflows/#{job.workflow_id}"
              }
            ]
          }
        ],
        sections: [
          %{
            facts: [
              %{
                name: "Posted By:",
                value: "#{exposed_domain_name}"
              },
              %{
                name: "Workflow Reference:",
                value: "#{workflow.reference}"
              },
              %{
                name: "Workflow ID:",
                value: "#{workflow.id}"
              },
              %{
                name: "Workflow Identifier:",
                value: "#{workflow.identifier}"
              },
              %{
                name: "Job ID:",
                value: "#{job.id}"
              },
              %{
                name: "Job:",
                value: "#{job.name}"
              }
            ],
            text: "```#{description}```"
          }
        ],
        summary: "MCAI Workflow Error Notification",
        themeColor: "FF5733",
        title: "MCAI Workflow Error Notification"
      })

    Logger.debug(
      "#{__MODULE__}: POST #{url}, headers: #{inspect(headers)}, body: #{inspect(body)}"
    )

    {:ok, response} = HTTPoison.request("POST", url, body, headers)

    if response.status_code == 200 do
      {:ok, response.body}
    else
      Logger.error("Unable to notify: #{inspect(response)}")
    end
  end

  def notification_hook_from_workflow(
        workflow_notification_url,
        workflow,
        exposed_domain_name,
        status_wf,
        workflow_endpoint_credentials
      ) do
    headers =
      if workflow_endpoint_credentials != nil do
        [
          {"content-type", "application/json"},
          {"Authorization", "Basic #{workflow_endpoint_credentials}"}
        ]
      else
        [{"content-type", "application/json"}]
      end

    body =
      if status_wf == "error" do
        Poison.encode!(%{
          "@context": "https://schema.org/extensions",
          "@type": "MessageCard",
          potentialAction: [
            %{
              "@type": "OpenUri",
              name: "View Workflow",
              targets: [
                %{
                  os: "default",
                  uri: "#{exposed_domain_name}/workflows/#{workflow.id}"
                }
              ]
            }
          ],
          sections: [
            %{
              facts: [
                %{
                  name: "Posted By:",
                  value: "#{exposed_domain_name}"
                },
                %{
                  name: "Workflow Reference:",
                  value: "#{workflow.reference}"
                },
                %{
                  name: "Workflow ID:",
                  value: "#{workflow.id}"
                },
                %{
                  name: "Workflow Identifier:",
                  value: "#{workflow.identifier}"
                },
                %{
                  name: "Workflow Status:",
                  value: "#{status_wf}"
                }
              ]
            }
          ],
          summary: "MCAI Workflow Error Notification",
          themeColor: "FF5733",
          title: "MCAI Workflow Error Notification"
        })
      else
        Poison.encode!(%{
          "@context": "https://schema.org/extensions",
          "@type": "MessageCard",
          potentialAction: [
            %{
              "@type": "OpenUri",
              name: "View Workflow",
              targets: [
                %{
                  os: "default",
                  uri: "#{exposed_domain_name}/workflows/#{workflow.id}"
                }
              ]
            }
          ],
          sections: [
            %{
              facts: [
                %{
                  name: "Posted By",
                  value: "#{exposed_domain_name}"
                },
                %{
                  name: "Workflow Reference",
                  value: "#{workflow.reference}"
                },
                %{
                  name: "Workflow ID",
                  value: "#{workflow.id}"
                },
                %{
                  name: "Workflow Identifier",
                  value: "#{workflow.identifier}"
                },
                %{
                  name: "Workflow Status",
                  value: "#{status_wf}"
                }
              ]
            }
          ],
          summary: "MCAI Workflow Success Notification",
          themeColor: "339933",
          title: "MCAI Workflow Success Notification"
        })
      end

    Logger.debug(
      "#{__MODULE__}: POST #{workflow_notification_url}, headers: #{inspect(headers)}, body: #{inspect(body)}"
    )

    HTTPoison.request("POST", workflow_notification_url, body, headers)
  end

  def notification_workflow_status(workflow_id, status_wf \\ "error") do
    topic = "update_workflow_" <> Integer.to_string(workflow_id)
    workflow = Workflows.get_workflow!(workflow_id)
    exposed_domain_name = StepFlow.Configuration.get_exposed_domain_name()

    workflow_notifications_with_status =
      for workflow_notification_data <- workflow.notification_hooks do
        {status, _} =
          notification_hook_from_workflow(
            StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "endpoint"),
            workflow,
            exposed_domain_name,
            status_wf,
            StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "credentials")
          )

        StepFlow.Notification.send(topic, %{workflow_id: workflow_id})
        Map.put(workflow_notification_data, "status", status)
      end

    {_, workflow} =
      update_workflow(workflow, %{notification_hooks: workflow_notifications_with_status})

    workflow
  end

  def notification_from_job(job_id, description \\ nil) do
    job = Jobs.get_job!(job_id)
    topic = "update_workflow_" <> Integer.to_string(job.workflow_id)
    channel = StepFlow.Configuration.get_slack_channel()
    workflow = Workflows.get_workflow!(job.workflow_id)
    exposed_domain_name = StepFlow.Configuration.get_exposed_domain_name()

    if StepFlow.Configuration.get_slack_token() != nil and description != nil and channel != nil do
      notification_slack_from_job(job, channel, exposed_domain_name, description)
    end

    url = StepFlow.Configuration.get_teams_url()

    if url != nil and description != nil do
      notification_teams_from_job(job, url, workflow, exposed_domain_name, description)
    end

    StepFlow.Notification.send(topic, %{workflow_id: job.workflow_id})
  end

  @doc """
  Deletes a Workflow.

  ## Examples

      iex> delete_workflow(workflow)
      {:ok, %Workflow{}}

      iex> delete_workflow(workflow)
      {:error, %Ecto.Changeset{}}

  """
  def delete_workflow(%Workflow{} = workflow) do
    workflow
    |> Workflow.changeset(%{deleted: true})
    |> Repo.update()
  end

  @doc """
  Returns an `%Ecto.Changeset{}` for tracking workflow changes.

  ## Examples

      iex> change_workflow(workflow)
      %Ecto.Changeset{source: %Workflow{}}

  """
  def change_workflow(%Workflow{} = workflow) do
    Workflow.changeset(workflow, %{})
  end

  def get_completed_statistics(scale, delta) do
    query =
      from(
        workflow in Workflow,
        inner_join:
          artifacts in subquery(
            from(
              artifacts in Artifact,
              where:
                artifacts.inserted_at > datetime_add(^NaiveDateTime.utc_now(), ^delta, ^scale),
              group_by: artifacts.workflow_id,
              select: %{
                workflow_id: artifacts.workflow_id,
                inserted_at: max(artifacts.inserted_at)
              }
            )
          ),
        on: workflow.id == artifacts.workflow_id,
        group_by: workflow.identifier,
        select: %{
          count: count(),
          duration:
            fragment(
              "EXTRACT(EPOCH FROM (SELECT avg(? - ?)))",
              artifacts.inserted_at,
              workflow.inserted_at
            ),
          identifier: workflow.identifier
        }
      )

    Repo.all(query)
  end

  @doc """
  Returns an attribute map to have a workflow configuration.
  """
  def get_attr(workflow) do
    %{
      schema_version: workflow.schema_version,
      identifier: workflow.identifier,
      version_major: workflow.version_major,
      version_minor: workflow.version_minor,
      version_micro: workflow.version_micro,
      tags: workflow.tags,
      is_live: workflow.is_live,
      deleted: workflow.deleted,
      notification_hooks: workflow.notification_hooks,
      reference: workflow.reference,
      user_uuid: workflow.user_uuid,
      steps: Enum.map(workflow.steps, fn x -> Map.drop(x, [:jobs, :status]) end),
      parameters: workflow.parameters
    }
  end

  @doc """
  Convert workflow version fields to a version string
  """
  def get_workflow_version_as_string(workflow) do
    to_string(workflow.version_major) <>
      "." <> to_string(workflow.version_minor) <> "." <> to_string(workflow.version_micro)
  end

  def abort(workflow) do
    workflow.steps
    |> abort_running_step_jobs(workflow)

    workflow.steps
    |> skip_remaining_steps(workflow)

    result = Workflows.Status.set_workflow_status(workflow.id, :stopped)

    topic = "update_workflow_" <> Integer.to_string(workflow.id)
    StepFlow.Notification.send(topic, %{workflow_id: workflow.id})

    result
  end

  def pause(workflow, action, trigger_date_time) do
    case action do
      action when action in ["resume", "abort"] ->
        description = %{
          action: action,
          trigger_at:
            DateTime.from_unix!(trigger_date_time, :millisecond)
            |> DateTime.to_naive()
        }

        result = Workflows.Status.set_workflow_status(workflow.id, :pausing, nil, description)

        topic = "update_workflow_" <> Integer.to_string(workflow.id)
        StepFlow.Notification.send(topic, %{workflow_id: workflow.id})

        result

      _ ->
        {:error, "Unknown action: #{action}"}
    end
  end

  def resume(workflow) do
    {:ok, _status} = Workflows.Status.set_workflow_status(workflow.id, :pending)

    case StepFlow.Step.start_next(workflow) do
      {:ok, _} ->
        result = Workflows.Status.set_workflow_status(workflow.id, :processing)

        topic = "update_workflow_" <> Integer.to_string(workflow.id)
        StepFlow.Notification.send(topic, %{workflow_id: workflow.id})

        result

      error ->
        error
    end
  end

  defp abort_running_step_jobs([], _workflow), do: nil

  defp abort_running_step_jobs([step | steps], workflow) do
    case step.status do
      :processing -> StepFlow.Step.abort_step_jobs(workflow, step)
      _ -> nil
    end

    abort_running_step_jobs(steps, workflow)
  end

  defp skip_remaining_steps([], _workflow), do: nil

  defp skip_remaining_steps([step | steps], workflow) do
    case step.status do
      :queued -> StepFlow.Step.skip_step(workflow, step)
      :paused -> StepFlow.Step.skip_step_jobs(workflow, step)
      :processing -> StepFlow.Step.skip_step_jobs(workflow, step)
      _ -> nil
    end

    skip_remaining_steps(steps, workflow)
  end
end