defmodule StepFlow.WorkflowEventsController do
use StepFlow, :controller
require Logger
import StepFlow.Controller.Helpers
alias StepFlow.{
Amqp.CommonEmitter,
Jobs,
Jobs.Status,
Notifications.Notification,
Repo,
Step.Helpers,
Step.Launch,
Step.Live,
Updates,
Workflows
}
action_fallback(StepFlow.FallbackController)
def handle(
%Plug.Conn{assigns: %{current_user: user}} = conn,
%{"workflow_id" => workflow_id} = params
) do
workflow = Workflows.get_workflow!(workflow_id)
case {params, workflow.is_live} do
{%{"event" => "abort"}, false} ->
abort(conn, workflow, user)
{%{"event" => "pause", "post_action" => action, "trigger_at" => trigger_date_time}, _} ->
pause(conn, workflow, user, action, trigger_date_time)
{%{"event" => "resume"}, _} ->
resume(conn, workflow, user)
{%{"event" => "update", "job_id" => job_id, "parameters" => parameters}, _} ->
update(conn, workflow, user, job_id, parameters)
{%{"event" => "retry", "job_id" => job_id}, _} ->
retry(conn, workflow, user, job_id)
{%{"event" => "stop"}, true} ->
stop(conn, workflow, user)
{%{"event" => "delete"}, _} ->
delete(conn, workflow, user)
{_, _} ->
send_resp(conn, 422, "event is not supported")
end
end
def handle(conn, _) do
conn
|> put_status(:forbidden)
|> json(%{status: "error", message: "Forbidden to handle workflow with this identifier"})
end
defp internal_retry_handle(
conn,
workflow,
job,
"job_notification",
last_status_state,
user_uuid
)
when last_status_state in [:error, :stopped] do
{:ok, job_status} = Jobs.Status.set_job_status(job.id, :retrying, %{user_uuid: user_uuid})
{:ok, _status} =
Workflows.Status.define_workflow_status(workflow.id, :job_retrying, job_status)
%{step: step, workflow: workflow} = Workflows.get_step_definition(job)
dates = Helpers.get_dates()
source_paths = Launch.get_source_paths(workflow, step, dates)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
{:ok, _} = Notification.process(workflow, dates, step_name, step, step_id, source_paths)
conn
|> put_status(:ok)
|> json(%{status: "ok"})
end
defp internal_retry_handle(conn, workflow, job, _job_name, last_status_state, user_uuid)
when last_status_state in [:error, :stopped] do
{:ok, job_status} = Jobs.Status.set_job_status(job.id, :retrying, %{user_uuid: user_uuid})
{:ok, _status} =
Workflows.Status.define_workflow_status(workflow.id, :job_retrying, job_status)
params = %{
job_id: job.id,
parameters: job.parameters
}
case CommonEmitter.publish_json(job.name, job.step_id, params) do
:ok ->
StepFlow.Notification.send("retry_job", %{workflow_id: workflow.id, body: params})
conn
|> put_status(:ok)
|> json(%{status: "ok"})
_ ->
conn
|> put_status(:ok)
|> json(%{status: "error", message: "unable to publish message"})
end
end
defp internal_retry_handle(conn, _workflow, _job, _job_name, _last_status_state, _user_uuid) do
send_resp(conn, :forbidden, "illegal operation")
end
defp update(conn, workflow, user, job_id, parameters) do
if has_right?("workflow::" <> workflow.identifier, user, "update") do
Logger.warn("update job #{job_id}")
job = Jobs.get_job_with_status!(job_id)
if job.is_updatable do
Updates.update_parameters(job, parameters)
conn
|> put_status(:ok)
|> json(%{status: "ok"})
else
Logger.error("Job #{job_id} cannot be updated !")
conn
|> put_status(:error)
|> json(%{status: "error", message: "Forbidden to update this job"})
end
else
conn
|> put_status(:forbidden)
|> json(%{status: "error", message: "Forbidden to update this workflow"})
end
end
defp abort(conn, workflow, user) do
if has_right?("workflow::" <> workflow.identifier, user, "abort") do
Workflows.abort(workflow)
conn
|> put_status(:ok)
|> json(%{status: "ok"})
else
conn
|> put_status(:forbidden)
|> json(%{status: "error", message: "Forbidden to abort this workflow"})
end
end
defp pause(conn, workflow, user, action, trigger_date_time) do
if has_right?("workflow::" <> workflow.identifier, user, "pause") do
case Workflows.pause(workflow, action, trigger_date_time) do
{:ok, _} ->
conn
|> put_status(:ok)
|> json(%{status: "ok"})
{:error, message} when is_bitstring(message) ->
Logger.error(
"Something went wrong pausing workflow #{workflow.id} #{workflow.identifier}: #{inspect(message)}"
)
conn
|> put_status(:not_found)
|> json(%{status: "error", message: message})
{:error, message} ->
Logger.error(
"Something went wrong pausing workflow #{workflow.id} #{workflow.identifier}: #{inspect(message)}"
)
conn
|> put_status(:internal_server_error)
|> json(%{status: "error", message: "Internal server error"})
_ ->
conn
|> put_status(:not_found)
|> json(%{status: "error", message: "Unknown error"})
end
else
conn
|> put_status(:forbidden)
|> json(%{status: "error", message: "Forbidden to pause this workflow"})
end
end
defp resume(conn, workflow, user) do
if has_right?("workflow::" <> workflow.identifier, user, "resume") do
case Workflows.resume(workflow) do
{:ok, _} ->
conn
|> put_status(:ok)
|> json(%{status: "ok"})
{:error, message} ->
Logger.error(
"Something went wrong resuming workflow #{workflow.id} #{workflow.identifier}: #{inspect(message)}"
)
conn
|> put_status(:internal_server_error)
|> json(%{status: "error", message: "Internal server error"})
end
else
conn
|> put_status(:forbidden)
|> json(%{status: "error", message: "Forbidden to resume this workflow"})
end
end
defp retry(conn, workflow, user, job_id) do
if has_right?("workflow::" <> workflow.identifier, user, "retry") && Map.has_key?(user, :uuid) do
Logger.warn("User #{user.uuid} is retrying job #{job_id}")
job = Jobs.get_job_with_status!(job_id)
last_status = Status.get_last_status(job.status)
internal_retry_handle(conn, workflow, job, job.name, last_status.state, user.uuid)
else
conn
|> put_status(:forbidden)
|> json(%{status: "error", message: "Forbidden to retry this workflow"})
end
end
defp stop(conn, workflow, user) do
if has_right?("workflow::" <> workflow.identifier, user, "abort") do
workflow_jobs = Repo.preload(workflow, [:jobs]).jobs
workflow_jobs
|> Live.stop_jobs()
{:ok, _status} = Workflows.Status.set_workflow_status(workflow.id, :stopped)
topic = "update_workflow_" <> Integer.to_string(workflow.id)
StepFlow.Notification.send(topic, %{workflow_id: workflow.id})
conn
|> put_status(:ok)
|> json(%{status: "ok"})
else
conn
|> put_status(:forbidden)
|> json(%{status: "error", message: "Forbidden to stop this workflow"})
end
end
defp delete(conn, workflow, user) do
if has_right?("workflow::" <> workflow.identifier, user, "delete") do
for job <- workflow.jobs do
Jobs.delete_job(job)
end
Workflows.delete_workflow(workflow)
StepFlow.Notification.send("delete_workflow", %{workflow_id: workflow.id})
conn
|> put_status(:ok)
|> json(%{status: "ok"})
else
conn
|> put_status(:forbidden)
|> json(%{status: "error", message: "Forbidden to delete this workflow"})
end
end
end