lib/step_flow/controllers/step/live.ex

defmodule StepFlow.Step.Live do
  @moduledoc """
  The Live step context.
  """

  alias StepFlow.Amqp.CommonEmitter
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.LiveWorkers
  alias StepFlow.Repo
  alias StepFlow.Step.LaunchJobs
  alias StepFlow.Step.LaunchParams
  require Logger

  def create_job_live([source_path | _source_paths], launch_params) do
    {message, job} = generate_message_live(source_path, launch_params)

    message =
      Map.put(
        message,
        :type,
        "create"
      )

    message = filter_message(message)

    params =
      StepFlow.Map.get_by_key_or_atom(message, :parameters, []) ++
        [%{id: "action", type: "string", value: "create"}]

    message = StepFlow.Map.replace_by_atom(message, :parameters, params)
    datetime = NaiveDateTime.to_string(DateTime.utc_now())

    case CommonEmitter.publish_json(
           "job_worker_manager",
           LaunchParams.get_step_id(launch_params),
           message
         ) do
      :ok ->
        Status.set_job_status(job.id, "queued", %{}, datetime)
        {:ok, "started"}

      _ ->
        {:error, "unable to publish message"}
    end
  end

  def update_job_live(job_id) do
    job = Repo.preload(Jobs.get_job(job_id), [:status, :updates, :workflow])
    workflow_jobs = Repo.preload(job.workflow, [:jobs]).jobs

    steps = job.workflow.steps

    start_next_job_live(workflow_jobs, steps)
  end

  defp start_next_job_live([], _step_id), do: {:ok, "nothing to do"}

  defp start_next_job_live([job | jobs], steps) do
    job = Repo.preload(Jobs.get_job(job.id), [:status])

    if job.status != [] do
      case StepFlow.Controllers.Jobs.get_last_status_id(job.status).state do
        :ready_to_init -> update_live_worker(steps, job, "initializing")
        :ready_to_start -> update_live_worker(steps, job, "starting")
        :update -> update_live_worker(steps, job, "updating")
        :stopped -> delete_live_worker(steps, job)
        _ -> {:ok, "nothing to do"}
      end
    end

    start_next_job_live(jobs, steps)
  end

  def stop_jobs([]), do: nil

  def stop_jobs([job | jobs]) do
    live_worker = LiveWorkers.get_by(%{"job_id" => job.id})

    case StepFlow.Controllers.Jobs.get_last_status(job.status).state do
      # Case where all jobs are crashing while queued is not fully tested nor probably handled
      x when x in [:error, :queued] ->
        if not is_terminated(live_worker) do
          delete_worker_from_job(job, false)
        end

      _ ->
        case Status.set_job_status(job.id, "stopped") do
          {:ok, _} -> delete_worker_from_job(job, true)
          _ -> Logger.warn("Cannot stop this job")
        end
    end

    stop_jobs(jobs)
  end

  defp is_terminated(live_worker) do
    case live_worker do
      nil ->
        false

      _ ->
        case live_worker.termination_date do
          nil -> false
          _ -> true
        end
    end
  end

  defp update_live_worker(steps, job, status) do
    case generate_message(steps, job) do
      {:ok, message} ->
        {:ok, _} = Status.set_job_status(job.id, status)
        publish_message(message, job.step_id)

      _ ->
        {:ok, "nothing to do"}
    end
  end

  defp delete_live_worker_internal(steps, job) do
    case generate_message(steps, job) do
      {:ok, message} ->
        message = filter_message(message)

        params =
          StepFlow.Map.get_by_key_or_atom(message, :parameters, []) ++
            [%{id: "action", type: "string", value: "delete"}]

        message = StepFlow.Map.replace_by_atom(message, :parameters, params)

        case CommonEmitter.publish_json(
               "job_worker_manager",
               job.step_id,
               message
             ) do
          :ok -> {:ok, "deleted"}
          _ -> {:error, "unable to publish message"}
        end

      {:error, _} ->
        {:error, "unable to delete live worker"}
    end
  end

  defp delete_live_worker(steps, job, set_status \\ true) do
    case delete_live_worker_internal(steps, job) do
      {:error, "unable to publish message"} ->
        {:error, "unable to publish message"}

      _ ->
        if set_status do
          {:ok, _} = Status.set_job_status(job.id, "deleting")
        end
    end
  end

  defp delete_worker_from_job(job, set_status) do
    job = Repo.preload(job, [:workflow, :status])

    delete_live_worker(
      job.workflow.steps,
      job,
      set_status
    )
  end

  def generate_message(steps, job) do
    message = StepFlow.Controllers.Jobs.get_message(job)

    requirements = get_requirements(steps, job.step_id)

    {result, message} =
      if requirements != nil do
        replace_ip_address(message, job.id, requirements)
      else
        case LiveWorkers.get_by(%{"job_id" => job.id}) do
          nil ->
            {:error, message}

          live_worker ->
            if live_worker.creation_date == nil || live_worker.instance_id == "" do
              {:error, message}
            else
              {:ok, message}
            end
        end
      end

    action =
      job.status
      |> StepFlow.Controllers.Jobs.get_last_status()
      |> StepFlow.Controllers.Jobs.get_action()

    {result, Map.put(message, :type, action)}
  end

  defp replace_ip_address(message, job_id, requirements) do
    job = Repo.preload(Jobs.get_job(job_id), [:status, :updates, :workflow])
    workflow = Repo.preload(job.workflow, [:jobs])

    job_req =
      Jobs.internal_list_jobs(%{
        "workflow_id" => workflow.id,
        "step_id" => requirements |> List.first()
      })
      |> Map.get(:data)
      |> List.first()

    live_worker = LiveWorkers.get_by(%{"job_id" => job_req.id})

    if live_worker != nil do
      ips = StepFlow.Map.get_by_key_or_atom(live_worker, :ips, [])
      port = StepFlow.Map.get_by_key_or_atom(live_worker, :ports, []) |> List.last()
      created = live_worker.creation_date

      if created != nil && ips != [] do
        ip = ips |> List.first()

        params =
          StepFlow.Map.get_by_key_or_atom(message, :parameters, [])
          |> Enum.map(fn param ->
            case StepFlow.Map.get_by_key_or_atom(param, :id) do
              "source_paths" ->
                value = ["srt://#{ip}:#{port}"]
                StepFlow.Map.replace_by_string(param, "value", value)

              "source_path" ->
                value = "srt://#{ip}:#{port}"
                StepFlow.Map.replace_by_string(param, "value", value)

              _ ->
                param
            end
          end)

        Jobs.update_job(job, %{parameters: params})

        {:ok, StepFlow.Map.replace_by_atom(message, :parameters, params)}
      else
        {:error, message}
      end
    else
      {:error, message}
    end
  end

  defp filter_message(message) do
    Map.put(
      message,
      :parameters,
      Enum.filter(message.parameters, fn x ->
        Enum.member?(
          ["step_id", "namespace", "worker", "ports", "direct_messaging_queue_name"],
          StepFlow.Map.get_by_key_or_atom(x, :id)
        )
      end)
    )
  end

  defp publish_message(message, step_id) do
    case get_instance_id(message) do
      {:error, _} ->
        job_id = StepFlow.Map.get_by_key_or_atom(message, :job_id)
        Logger.warning("No live worker associated to #{job_id}")

      {:ok, instance_id} ->
        case CommonEmitter.publish_json(
               "direct_messaging_" <> get_direct_messaging_queue(message),
               step_id,
               message,
               "direct_messaging",
               headers: [{"instance_id", :longstr, instance_id}]
             ) do
          :ok ->
            {:ok, "started"}

          _ ->
            {:error, "unable to publish message"}
        end
    end
  end

  defp get_direct_messaging_queue(message) do
    StepFlow.Map.get_by_key_or_atom(message, :parameters)
    |> Enum.filter(fn param ->
      StepFlow.Map.get_by_key_or_atom(param, :id) == "direct_messaging_queue_name"
    end)
    |> List.first()
    |> StepFlow.Map.get_by_key_or_atom(:value)
  end

  defp get_instance_id(message) do
    job_id = StepFlow.Map.get_by_key_or_atom(message, :job_id)

    case LiveWorkers.get_by(%{"job_id" => job_id}) do
      nil ->
        {:error, nil}

      live_worker ->
        {:ok, live_worker.instance_id |> String.slice(0..11)}
    end
  end

  defp get_requirements(steps, step_id) do
    case steps do
      [] ->
        nil

      _ ->
        Enum.filter(steps, fn step ->
          StepFlow.Map.get_by_key_or_atom(step, :id) == step_id
        end)
        |> List.first()
        |> StepFlow.Map.get_by_key_or_atom(:parent_ids)
    end
  end

  def generate_message_live(
        source_path,
        launch_params
      ) do
    parameters =
      LaunchJobs.generate_job_parameters_one_for_one(
        source_path,
        launch_params
      )

    job_params = %{
      name: LaunchParams.get_step_name(launch_params),
      step_id: LaunchParams.get_step_id(launch_params),
      is_live: true,
      workflow_id: launch_params.workflow.id,
      parameters: parameters
    }

    {:ok, job} = Jobs.create_job(job_params)

    {StepFlow.Controllers.Jobs.get_message(job), job}
  end
end