lib/step_flow/workflow_definitions/workflow_definitions.ex

defmodule StepFlow.WorkflowDefinitions do
  @moduledoc """
  The WorkflowDefinitions context.
  """

  import Ecto.Query, warn: false
  alias StepFlow.Repo
  alias StepFlow.Roles
  alias StepFlow.WorkflowDefinitions.ExternalLoader
  alias StepFlow.WorkflowDefinitions.WorkflowDefinition
  require Logger

  @doc """
  Returns the list of Workflow Definitions.
  """
  def list_workflow_definitions(params \\ %{}) do
    allowed_workflows = check_rights(params)

    page =
      Map.get(params, "page", 0)
      |> StepFlow.Integer.force()

    size =
      Map.get(params, "size", 10)
      |> StepFlow.Integer.force()

    mode = Map.get(params, "mode", "full")

    offset = page * size

    query =
      from(workflow_definition in WorkflowDefinition)
      |> filter_by_rights(allowed_workflows)
      |> filter_by_label_or_identifier(Map.get(params, "search"))
      |> filter_by_versions(Map.get(params, "versions"))
      |> select_by_mode(mode)

    total_query = from(item in subquery(query), select: count(item.id))

    total =
      Repo.all(total_query)
      |> List.first()

    query =
      from(
        workflow_definition in subquery(query),
        order_by: [
          desc: workflow_definition.version_major,
          desc: workflow_definition.version_minor,
          desc: workflow_definition.version_micro
        ]
      )
      |> paginate(offset, size)

    workflow_definitions = Repo.all(query)

    %{
      data: workflow_definitions,
      total: total,
      page: page,
      size: size,
      mode: mode
    }
  end

  defp check_rights(params) do
    roles =
      StepFlow.Map.get_by_key_or_atom(params, :roles)
      |> Roles.get_roles()

    view_rights =
      case roles do
        nil ->
          []

        _ ->
          Roles.get_rights_for_entity_type_and_action(roles, "workflow", "view")
      end

    for %{entity: entity} <- view_rights,
        do:
          String.split(entity, "::")
          |> List.last()
  end

  defp paginate(query, offset, size) do
    case size do
      -1 ->
        query

      _ ->
        from(
          workflow_definition in subquery(query),
          offset: ^offset,
          limit: ^size
        )
    end
  end

  def filter_by_versions(query, versions) do
    case versions do
      ["latest"] ->
        from(
          workflow_definition in subquery(query),
          order_by: [
            desc: workflow_definition.version_major,
            desc: workflow_definition.version_minor,
            desc: workflow_definition.version_micro
          ],
          distinct: :identifier
        )

      versions when is_list(versions) and length(versions) != 0 ->
        from(
          workflow_definition in subquery(query),
          where:
            fragment(
              "concat(?, '.', ?, '.', ?) = ANY(?)",
              workflow_definition.version_major,
              workflow_definition.version_minor,
              workflow_definition.version_micro,
              ^versions
            )
        )

      _ ->
        query
    end
  end

  defp filter_by_label_or_identifier(query, search) do
    case search do
      nil ->
        query

      search ->
        like = "%#{search}%"

        from(
          workflow_definition in subquery(query),
          where:
            ilike(workflow_definition.label, ^like) or
              ilike(workflow_definition.identifier, ^search)
        )
    end
  end

  defp filter_by_rights(query, allowed) do
    if Enum.member?(allowed, "*") do
      query
    else
      from(
        workflow_definition in subquery(query),
        where: workflow_definition.identifier in ^allowed
      )
    end
  end

  defp select_by_mode(query, mode) do
    case mode do
      "simple" ->
        from(
          workflow_definition in subquery(query),
          select: %{
            id: workflow_definition.id,
            identifier: workflow_definition.identifier,
            is_live: workflow_definition.is_live,
            label: workflow_definition.label,
            version_major: workflow_definition.version_major,
            version_minor: workflow_definition.version_minor,
            version_micro: workflow_definition.version_micro
          }
        )

      "full" ->
        query

      _ ->
        query
    end
  end

  @doc """
  Gets a single WorkflowDefinition (latest version).

  Returns nil if the WorkflowDefinition does not exist.
  """
  def get_workflow_definition(identifier) do
    query =
      from(workflow_definition in WorkflowDefinition,
        where: workflow_definition.identifier == ^identifier,
        order_by: [
          desc: workflow_definition.version_major,
          desc: workflow_definition.version_minor,
          desc: workflow_definition.version_micro
        ],
        limit: 1
      )

    Repo.one(query)
  end

  @doc """
  Gets a single WorkflowDefinition by specifying the version.

  Returns nil if the WorkflowDefinition does not exist.
  """
  def get_workflow_definition(identifier, version_major, version_minor, version_micro) do
    query =
      from(workflow_definition in WorkflowDefinition,
        where:
          workflow_definition.identifier == ^identifier and
            workflow_definition.version_major == ^version_major and
            workflow_definition.version_minor == ^version_minor and
            workflow_definition.version_micro == ^version_micro,
        limit: 1
      )

    Repo.one(query)
  end

  @doc """
  Loads workflows
  """
  def load_workflows do
    get_workflow_definition_directories()
    |> Enum.map(fn directory ->
      list_workflow_definitions_for_a_directory(directory)
    end)
    |> List.flatten()
  end

  @doc """
  Loads workflows in database
  """
  def load_workflows_in_database do
    get_workflow_definition_directories()
    |> Enum.map(fn directory ->
      list_workflow_definitions_for_a_directory(directory)
    end)
    |> List.flatten()
    |> Enum.each(fn workflow_definition ->
      case get_workflow_definition(
             StepFlow.Map.get_by_key_or_atom(workflow_definition, :identifier),
             StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_major),
             StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_minor),
             StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_micro)
           ) do
        nil ->
          %WorkflowDefinition{}
          |> WorkflowDefinition.changeset(workflow_definition)
          |> Repo.insert()

        _ ->
          Logger.warn("Workflow already present in database")
      end
    end)
  end

  @doc """
  Get workflow definitions from directories
  """
  def get_workflow_definition_directories do
    Application.get_env(:step_flow, StepFlow)
    |> Keyword.get(:workflow_definition)
    |> case do
      {:system, key} ->
        System.get_env(key)
        |> String.split(get_separator())

      key when is_list(key) ->
        key

      key when is_bitstring(key) ->
        [key]

      key ->
        Logger.info("unable to use #{inspect(key)} to list directory")
        []
    end
  end

  def valid?(definition) do
    get_schema()
    |> JsonXema.valid?(definition)
  end

  def validate(definition) do
    get_schema()
    |> JsonXema.validate(definition)
  end

  defp get_schema do
    schema =
      Application.get_env(
        StepFlow.WorkflowDefinitions.WorkflowDefinition,
        :workflow_schema_url,
        "https://media-cloud.ai/standard/1.10/workflow-definition.schema.json"
      )
      |> load_content()
      |> Jason.decode!()

    :ok = JsonXema.SchemaValidator.validate("http://json-schema.org/draft-07/schema#", schema)

    JsonXema.new(schema, loader: ExternalLoader)
  end

  defp load_content("http://" <> _ = url) do
    HTTPoison.get!(url)
    |> Map.get(:body)
  end

  defp load_content("https://" <> _ = url) do
    HTTPoison.get!(url)
    |> Map.get(:body)
  end

  defp load_content(source_filename) do
    File.read!(source_filename)
  end

  defp get_separator do
    if :os.type() |> elem(0) == :unix do
      ":"
    else
      ";"
    end
  end

  defp list_workflow_definitions_for_a_directory(directory) do
    File.ls!(directory)
    |> Enum.filter(fn filename ->
      String.ends_with?(filename, ".json")
    end)
    |> Enum.map(fn filename ->
      Path.join(directory, filename)
      |> File.read!()
      |> Jason.decode!()
    end)
    |> Enum.filter(fn workflow_definition ->
      if valid?(workflow_definition) do
        true
      else
        fun = fn _error, path, acc ->
          ["at " <> inspect(path) | acc]
        end

        errors =
          validate(workflow_definition)
          |> JsonXema.ValidationError.travers_errors([], fun)

        Logger.error(
          "Workflow definition #{inspect(Map.get(workflow_definition, "identifier"))} not valid: #{inspect(errors)}"
        )

        false
      end
    end)
  end
end