lib/step_flow/step/live.ex

defmodule StepFlow.Step.Live do
  @moduledoc """
  The Live step context.
  """

  alias StepFlow.Amqp.CommonEmitter
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.LiveWorkers
  alias StepFlow.Repo
  alias StepFlow.Step.Launch
  alias StepFlow.Step.LaunchParams

  def create_job_live([source_path | _source_paths], launch_params) do
    {message, job} = generate_message_live(source_path, launch_params)

    message =
      Map.put(
        message,
        :type,
        "create"
      )

    message = filter_message(message)

    params =
      StepFlow.Map.get_by_key_or_atom(message, :parameters, []) ++
        [%{id: "action", type: "string", value: "create"}]

    message = StepFlow.Map.replace_by_atom(message, :parameters, params)

    case CommonEmitter.publish_json(
           "job_worker_manager",
           LaunchParams.get_step_id(launch_params),
           message
         ) do
      :ok ->
        Status.set_job_status(job.id, "queued")
        {:ok, "started"}

      _ ->
        {:error, "unable to publish message"}
    end
  end

  def update_job_live(job_id) do
    job = Repo.preload(Jobs.get_job(job_id), [:status, :updates, :workflow])
    workflow_jobs = Repo.preload(job.workflow, [:jobs]).jobs

    steps = job.workflow.steps

    start_next_job_live(workflow_jobs, steps)
  end

  defp start_next_job_live([], _step_id), do: {:ok, "nothing to do"}

  defp start_next_job_live([job | jobs], steps) do
    job = Repo.preload(Jobs.get_job(job.id), [:status])

    if job.status != [] do
      case Status.get_last_status_id(job.status).state do
        :ready_to_init -> update_live_worker(steps, job, "initializing")
        :ready_to_start -> update_live_worker(steps, job, "starting")
        :update -> update_live_worker(steps, job, "updating")
        :stopped -> delete_live_worker(steps, job)
        _ -> {:ok, "nothing to do"}
      end
    end

    start_next_job_live(jobs, steps)
  end

  def stop_jobs([]), do: nil

  def stop_jobs([job | jobs]) do
    stop_job(job)

    stop_jobs(jobs)
  end

  def stop_job(job) do
    Jobs.get_message(job)
    |> Map.put(:type, "stop_process")
    |> publish_message(job.step_id)
  end

  defp update_live_worker(steps, job, status) do
    case generate_message(steps, job) do
      {:ok, message} ->
        {:ok, _} = Status.set_job_status(job.id, status)
        publish_message(message, job.step_id)

      _ ->
        {:ok, "nothing to do"}
    end
  end

  defp delete_live_worker_internal(steps, job) do
    {_, message} = generate_message(steps, job)
    message = filter_message(message)

    params =
      StepFlow.Map.get_by_key_or_atom(message, :parameters, []) ++
        [%{id: "action", type: "string", value: "delete"}]

    message = StepFlow.Map.replace_by_atom(message, :parameters, params)

    case CommonEmitter.publish_json(
           "job_worker_manager",
           job.step_id,
           message
         ) do
      :ok -> {:ok, "deleted"}
      _ -> {:error, "unable to publish message"}
    end
  end

  defp delete_live_worker(steps, job, set_status \\ true) do
    case delete_live_worker_internal(steps, job) do
      {:error, "unable to publish message"} ->
        {:error, "unable to publish message"}

      _ ->
        if set_status do
          {:ok, _} = Status.set_job_status(job.id, "deleting")
        end
    end
  end

  def delete_worker_from_job(job) do
    job = Repo.preload(job, [:workflow, :status])

    delete_live_worker(
      job.workflow.steps,
      job,
      false
    )
  end

  def generate_message(steps, job) do
    message = Jobs.get_message(job)

    requirements = get_requirements(steps, job.step_id)

    {result, message} =
      if requirements != nil do
        replace_ip_address(message, job.id, requirements)
      else
        live_worker = LiveWorkers.get_by(%{"job_id" => job.id})

        if live_worker.creation_date == nil || live_worker.instance_id == "" do
          {:error, message}
        else
          {:ok, message}
        end
      end

    action =
      job.status
      |> Status.get_last_status()
      |> Status.get_action()

    {result, Map.put(message, :type, action)}
  end

  defp replace_ip_address(message, job_id, requirements) do
    job = Repo.preload(Jobs.get_job(job_id), [:status, :updates, :workflow])
    workflow = Repo.preload(job.workflow, [:jobs])

    job_req =
      Jobs.internal_list_jobs(%{
        "workflow_id" => workflow.id,
        "step_id" => requirements |> List.first()
      })
      |> Map.get(:data)
      |> List.first()

    live_worker = LiveWorkers.get_by(%{"job_id" => job_req.id})
    ips = live_worker.ips
    port = live_worker.ports |> List.last()
    created = live_worker.creation_date

    if created != nil && ips != [] do
      ip = ips |> List.first()

      params =
        StepFlow.Map.get_by_key_or_atom(message, :parameters, [])
        |> Enum.map(fn param ->
          case StepFlow.Map.get_by_key_or_atom(param, :id) do
            "source_paths" ->
              value = ["srt://#{ip}:#{port}"]
              StepFlow.Map.replace_by_string(param, "value", value)

            "source_path" ->
              value = "srt://#{ip}:#{port}"
              StepFlow.Map.replace_by_string(param, "value", value)

            _ ->
              param
          end
        end)

      Jobs.update_job(job, %{parameters: params})

      {:ok, StepFlow.Map.replace_by_atom(message, :parameters, params)}
    else
      {:error, message}
    end
  end

  defp filter_message(message) do
    Map.put(
      message,
      :parameters,
      Enum.filter(message.parameters, fn x ->
        Enum.member?(
          ["step_id", "namespace", "worker", "ports", "direct_messaging_queue_name"],
          StepFlow.Map.get_by_key_or_atom(x, :id)
        )
      end)
    )
  end

  defp publish_message(message, step_id) do
    case CommonEmitter.publish_json(
           "direct_messaging_" <> get_direct_messaging_queue(message),
           step_id,
           message,
           "direct_messaging",
           headers: [{"instance_id", :longstr, get_instance_id(message)}]
         ) do
      :ok ->
        {:ok, "started"}

      _ ->
        {:error, "unable to publish message"}
    end
  end

  defp get_direct_messaging_queue(message) do
    StepFlow.Map.get_by_key_or_atom(message, :parameters)
    |> Enum.filter(fn param ->
      StepFlow.Map.get_by_key_or_atom(param, :id) == "direct_messaging_queue_name"
    end)
    |> List.first()
    |> StepFlow.Map.get_by_key_or_atom(:value)
  end

  defp get_instance_id(message) do
    job_id = StepFlow.Map.get_by_key_or_atom(message, :job_id)
    LiveWorkers.get_by(%{"job_id" => job_id}).instance_id |> String.slice(0..11)
  end

  defp get_requirements(steps, step_id) do
    case steps do
      [] ->
        nil

      _ ->
        Enum.filter(steps, fn step ->
          StepFlow.Map.get_by_key_or_atom(step, :id) == step_id
        end)
        |> List.first()
        |> StepFlow.Map.get_by_key_or_atom(:parent_ids)
    end
  end

  def generate_message_live(
        source_path,
        launch_params
      ) do
    parameters =
      Launch.generate_job_parameters_one_for_one(
        source_path,
        launch_params
      )

    job_params = %{
      name: LaunchParams.get_step_name(launch_params),
      step_id: LaunchParams.get_step_id(launch_params),
      is_live: true,
      workflow_id: launch_params.workflow.id,
      parameters: parameters
    }

    {:ok, job} = Jobs.create_job(job_params)

    {Jobs.get_message(job), job}
  end
end