defmodule StepFlow.Step.Live do
@moduledoc """
The Live step context.
"""
alias StepFlow.Amqp.CommonEmitter
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.LiveWorkers
alias StepFlow.Repo
alias StepFlow.Step.LaunchJobs
alias StepFlow.Step.LaunchParams
require Logger
def create_job_live([source_path | _source_paths], launch_params) do
{message, job} = generate_message_live(source_path, launch_params)
message =
Map.put(
message,
:type,
"create"
)
message = filter_message(message)
params =
StepFlow.Map.get_by_key_or_atom(message, :parameters, []) ++
[%{id: "action", type: "string", value: "create"}]
message = StepFlow.Map.replace_by_atom(message, :parameters, params)
datetime = NaiveDateTime.to_string(DateTime.utc_now())
case CommonEmitter.publish_json(
"job_worker_manager",
LaunchParams.get_step_id(launch_params),
message
) do
:ok ->
Status.set_job_status(job.id, "queued", %{}, datetime)
{:ok, "started"}
_ ->
{:error, "unable to publish message"}
end
end
def update_job_live(job_id) do
job = Repo.preload(Jobs.get_job(job_id), [:status, :updates, :workflow])
workflow_jobs = Repo.preload(job.workflow, [:jobs]).jobs
steps = job.workflow.steps
start_next_job_live(workflow_jobs, steps)
end
defp start_next_job_live([], _step_id), do: {:ok, "nothing to do"}
defp start_next_job_live([job | jobs], steps) do
job = Repo.preload(Jobs.get_job(job.id), [:status])
if job.status != [] do
case StepFlow.Controllers.Jobs.get_last_status_id(job.status).state do
:ready_to_init -> update_live_worker(steps, job, "initializing")
:ready_to_start -> update_live_worker(steps, job, "starting")
:update -> update_live_worker(steps, job, "updating")
:stopped -> delete_live_worker(steps, job)
_ -> {:ok, "nothing to do"}
end
end
start_next_job_live(jobs, steps)
end
def stop_jobs([]), do: nil
def stop_jobs([job | jobs]) do
live_worker = LiveWorkers.get_by(%{"job_id" => job.id})
case StepFlow.Controllers.Jobs.get_last_status(job.status).state do
# Case where all jobs are crashing while queued is not fully tested nor probably handled
x when x in [:error, :queued] ->
if not is_terminated(live_worker) do
delete_worker_from_job(job, false)
end
_ ->
case Status.set_job_status(job.id, "stopped") do
{:ok, _} -> delete_worker_from_job(job, true)
_ -> Logger.warn("Cannot stop this job")
end
end
stop_jobs(jobs)
end
defp is_terminated(live_worker) do
case live_worker do
nil ->
false
_ ->
case live_worker.termination_date do
nil -> false
_ -> true
end
end
end
defp update_live_worker(steps, job, status) do
case generate_message(steps, job) do
{:ok, message} ->
{:ok, _} = Status.set_job_status(job.id, status)
publish_message(message, job.step_id)
_ ->
{:ok, "nothing to do"}
end
end
defp delete_live_worker_internal(steps, job) do
case generate_message(steps, job) do
{:ok, message} ->
message = filter_message(message)
params =
StepFlow.Map.get_by_key_or_atom(message, :parameters, []) ++
[%{id: "action", type: "string", value: "delete"}]
message = StepFlow.Map.replace_by_atom(message, :parameters, params)
case CommonEmitter.publish_json(
"job_worker_manager",
job.step_id,
message
) do
:ok -> {:ok, "deleted"}
_ -> {:error, "unable to publish message"}
end
{:error, _} ->
{:error, "unable to delete live worker"}
end
end
defp delete_live_worker(steps, job, set_status \\ true) do
case delete_live_worker_internal(steps, job) do
{:error, "unable to publish message"} ->
{:error, "unable to publish message"}
_ ->
if set_status do
{:ok, _} = Status.set_job_status(job.id, "deleting")
end
end
end
defp delete_worker_from_job(job, set_status) do
job = Repo.preload(job, [:workflow, :status])
delete_live_worker(
job.workflow.steps,
job,
set_status
)
end
def generate_message(steps, job) do
message = StepFlow.Controllers.Jobs.get_message(job)
requirements = get_requirements(steps, job.step_id)
{result, message} =
if requirements != nil do
replace_ip_address(message, job.id, requirements)
else
case LiveWorkers.get_by(%{"job_id" => job.id}) do
nil ->
{:error, message}
live_worker ->
if live_worker.creation_date == nil || live_worker.instance_id == "" do
{:error, message}
else
{:ok, message}
end
end
end
action =
job.status
|> StepFlow.Controllers.Jobs.get_last_status()
|> StepFlow.Controllers.Jobs.get_action()
{result, Map.put(message, :type, action)}
end
defp replace_ip_address(message, job_id, requirements) do
job = Repo.preload(Jobs.get_job(job_id), [:status, :updates, :workflow])
workflow = Repo.preload(job.workflow, [:jobs])
job_req =
Jobs.internal_list_jobs(%{
"workflow_id" => workflow.id,
"step_id" => requirements |> List.first()
})
|> Map.get(:data)
|> List.first()
live_worker = LiveWorkers.get_by(%{"job_id" => job_req.id})
if live_worker != nil do
ips = StepFlow.Map.get_by_key_or_atom(live_worker, :ips, [])
port = StepFlow.Map.get_by_key_or_atom(live_worker, :ports, []) |> List.last()
created = live_worker.creation_date
if created != nil && ips != [] do
ip = ips |> List.first()
params =
StepFlow.Map.get_by_key_or_atom(message, :parameters, [])
|> Enum.map(fn param ->
case StepFlow.Map.get_by_key_or_atom(param, :id) do
"source_paths" ->
value = ["srt://#{ip}:#{port}"]
StepFlow.Map.replace_by_string(param, "value", value)
"source_path" ->
value = "srt://#{ip}:#{port}"
StepFlow.Map.replace_by_string(param, "value", value)
_ ->
param
end
end)
Jobs.update_job(job, %{parameters: params})
{:ok, StepFlow.Map.replace_by_atom(message, :parameters, params)}
else
{:error, message}
end
else
{:error, message}
end
end
defp filter_message(message) do
Map.put(
message,
:parameters,
Enum.filter(message.parameters, fn x ->
Enum.member?(
["step_id", "namespace", "worker", "ports", "direct_messaging_queue_name"],
StepFlow.Map.get_by_key_or_atom(x, :id)
)
end)
)
end
defp publish_message(message, step_id) do
case get_instance_id(message) do
{:error, _} ->
job_id = StepFlow.Map.get_by_key_or_atom(message, :job_id)
Logger.warning("No live worker associated to #{job_id}")
{:ok, instance_id} ->
case CommonEmitter.publish_json(
"direct_messaging_" <> get_direct_messaging_queue(message),
step_id,
message,
"direct_messaging",
headers: [{"instance_id", :longstr, instance_id}]
) do
:ok ->
{:ok, "started"}
_ ->
{:error, "unable to publish message"}
end
end
end
defp get_direct_messaging_queue(message) do
StepFlow.Map.get_by_key_or_atom(message, :parameters)
|> Enum.filter(fn param ->
StepFlow.Map.get_by_key_or_atom(param, :id) == "direct_messaging_queue_name"
end)
|> List.first()
|> StepFlow.Map.get_by_key_or_atom(:value)
end
defp get_instance_id(message) do
job_id = StepFlow.Map.get_by_key_or_atom(message, :job_id)
case LiveWorkers.get_by(%{"job_id" => job_id}) do
nil ->
{:error, nil}
live_worker ->
{:ok, live_worker.instance_id |> String.slice(0..11)}
end
end
defp get_requirements(steps, step_id) do
case steps do
[] ->
nil
_ ->
Enum.filter(steps, fn step ->
StepFlow.Map.get_by_key_or_atom(step, :id) == step_id
end)
|> List.first()
|> StepFlow.Map.get_by_key_or_atom(:parent_ids)
end
end
def generate_message_live(
source_path,
launch_params
) do
parameters =
LaunchJobs.generate_job_parameters_one_for_one(
source_path,
launch_params
)
job_params = %{
name: LaunchParams.get_step_name(launch_params),
step_id: LaunchParams.get_step_id(launch_params),
is_live: true,
workflow_id: launch_params.workflow.id,
parameters: parameters
}
{:ok, job} = Jobs.create_job(job_params)
{StepFlow.Controllers.Jobs.get_message(job), job}
end
end