defmodule StepFlow.NotificationHooks.NotificationHookManager do
@moduledoc """
Notification manager context.
"""
alias StepFlow.Jobs
alias StepFlow.NotificationHooks.NotificationEndpoints
alias StepFlow.Workflows
require Logger
def get_last_status(workflow_notification_data) do
if StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "status") == nil do
[]
else
StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "status")
end
end
def notification_slack_from_job(job, channel, exposed_domain_name, description) do
send(
:step_flow_slack_bot,
{:message,
"Error for job #{job.name} ##{job.id} <#{exposed_domain_name}/workflows/#{job.workflow_id} |Open Workflow>\n```#{description}```",
channel}
)
end
def notification_teams_from_job(job, url, workflow, exposed_domain_name, description) do
headers = [{"content-type", "application/json"}]
body =
Poison.encode!(%{
"@context": "https://schema.org/extensions",
"@type": "MessageCard",
potentialAction: [
%{
"@type": "OpenUri",
name: "View Workflow",
targets: [
%{
os: "default",
uri: "#{exposed_domain_name}/workflows/#{job.workflow_id}"
}
]
}
],
sections: [
%{
facts: [
%{
name: "Posted By:",
value: "#{exposed_domain_name}"
},
%{
name: "Workflow Reference:",
value: "#{workflow.reference}"
},
%{
name: "Workflow ID:",
value: "#{workflow.id}"
},
%{
name: "Workflow Identifier:",
value: "#{workflow.identifier}"
},
%{
name: "Job ID:",
value: "#{job.id}"
},
%{
name: "Job:",
value: "#{job.name}"
}
],
text: "```#{description}```"
}
],
summary: "MCAI Workflow Error Notification",
themeColor: "FF5733",
title: "MCAI Workflow Error Notification"
})
Logger.debug(
"#{__MODULE__}: POST #{url}, headers: #{inspect(headers)}, body: #{inspect(body)}"
)
{:ok, response} = HTTPoison.post(url, body, headers)
if response.status_code == 200 do
{:ok, response.body}
else
Logger.error("Unable to notify: #{inspect(response)}")
end
end
def notification_from_job(job_id, description \\ nil) do
job = Jobs.get_job!(job_id)
topic = "update_workflow_" <> Integer.to_string(job.workflow_id)
channel = StepFlow.Configuration.get_slack_channel()
workflow = Workflows.get_workflow!(job.workflow_id)
exposed_domain_name = StepFlow.Configuration.get_exposed_domain_name()
slack_token = StepFlow.Configuration.get_slack_token()
if !(blank?(slack_token) or blank?(description) or blank?(channel)) do
notification_slack_from_job(job, channel, exposed_domain_name, description)
end
teams_url = StepFlow.Configuration.get_teams_url()
if !(blank?(teams_url) or blank?(description)) do
notification_teams_from_job(job, teams_url, workflow, exposed_domain_name, description)
end
StepFlow.Notification.send(topic, %{workflow_id: job.workflow_id})
end
def notify_endpoint_with_workflow_status(
workflow_notification_url,
workflow,
exposed_domain_name,
status_wf,
workflow_endpoint_credentials
) do
headers = [{"content-type", "application/json"}]
body =
if status_wf == "workflow_error" do
Poison.encode!(%{
"@context": "https://schema.org/extensions",
"@type": "MessageCard",
potentialAction: [
%{
"@type": "OpenUri",
name: "View Workflow",
targets: [
%{
os: "default",
uri: "#{exposed_domain_name}/workflows/#{workflow.id}"
}
]
}
],
sections: [
%{
facts: [
%{
name: "Posted By",
value: "#{exposed_domain_name}"
},
%{
name: "Workflow Reference",
value: "#{workflow.reference}"
},
%{
name: "Workflow ID",
value: "#{workflow.id}"
},
%{
name: "Workflow Identifier",
value: "#{workflow.identifier}"
},
%{
name: "Workflow Status",
value: "#{status_wf}"
}
]
}
],
summary: "MCAI Workflow Error Notification",
themeColor: "FF5733",
title: "MCAI Workflow Error Notification"
})
else
Poison.encode!(%{
"@context": "https://schema.org/extensions",
"@type": "MessageCard",
potentialAction: [
%{
"@type": "OpenUri",
name: "View Workflow",
targets: [
%{
os: "default",
uri: "#{exposed_domain_name}/workflows/#{workflow.id}"
}
]
}
],
sections: [
%{
facts: [
%{
name: "Posted By",
value: "#{exposed_domain_name}"
},
%{
name: "Workflow Reference",
value: "#{workflow.reference}"
},
%{
name: "Workflow ID",
value: "#{workflow.id}"
},
%{
name: "Workflow Identifier",
value: "#{workflow.identifier}"
},
%{
name: "Workflow Status",
value: "#{status_wf}"
}
]
}
],
summary: "MCAI Workflow Success Notification",
themeColor: "339933",
title: "MCAI Workflow Success Notification"
})
end
options =
if workflow_endpoint_credentials != nil do
[username, password] = String.split(workflow_endpoint_credentials, ":", trim: true)
[hackney: [basic_auth: {username, password}]]
else
[]
end
Logger.debug(
"#{__MODULE__}: POST #{workflow_notification_url}, headers: #{inspect(headers)}, body: #{inspect(body)}"
)
{_, response} = HTTPoison.post(workflow_notification_url, body, headers, options)
%{
"condition" => status_wf,
"timestamp" =>
NaiveDateTime.utc_now()
|> NaiveDateTime.to_string()
|> NaiveDateTime.from_iso8601!()
|> NaiveDateTime.truncate(:second),
"response" => response.status_code
}
end
def notify_endpoint_with_job_status(
workflow_notification_url,
job,
workflow,
exposed_domain_name,
status_job,
workflow_endpoint_credentials
) do
headers = [{"content-type", "application/json"}]
body =
if status_job == "job_error" do
Poison.encode!(%{
"@context": "https://schema.org/extensions",
"@type": "MessageCard",
potentialAction: [
%{
"@type": "OpenUri",
name: "View Workflow",
targets: [
%{
os: "default",
uri: "#{exposed_domain_name}/workflows/#{workflow.id}"
}
]
}
],
sections: [
%{
facts: [
%{
name: "Posted By",
value: "#{exposed_domain_name}"
},
%{
name: "Workflow Reference",
value: "#{workflow.reference}"
},
%{
name: "Workflow ID",
value: "#{workflow.id}"
},
%{
name: "Job Id",
value: "#{job.id}"
},
%{
name: "Job Status",
value: "#{status_job}"
}
]
}
],
summary: "MCAI Job Error Notification",
themeColor: "FF5733",
title: "MCAI Job Error Notification"
})
else
Poison.encode!(%{
"@context": "https://schema.org/extensions",
"@type": "MessageCard",
potentialAction: [
%{
"@type": "OpenUri",
name: "View Workflow",
targets: [
%{
os: "default",
uri: "#{exposed_domain_name}/workflows/#{workflow.id}"
}
]
}
],
sections: [
%{
facts: [
%{
name: "Posted By",
value: "#{exposed_domain_name}"
},
%{
name: "Workflow Reference",
value: "#{workflow.reference}"
},
%{
name: "Workflow ID",
value: "#{workflow.id}"
},
%{
name: "Job Id",
value: "#{job.id}"
},
%{
name: "Job Status",
value: "#{status_job}"
}
]
}
],
summary: "MCAI Job Success Notification",
themeColor: "339933",
title: "MCAI Job Success Notification"
})
end
options =
if workflow_endpoint_credentials != nil do
[username, password] = String.split(workflow_endpoint_credentials, ":", trim: true)
[hackney: [basic_auth: {username, password}]]
else
[]
end
Logger.debug(
"#{__MODULE__}: POST #{workflow_notification_url}, headers: #{inspect(headers)}, body: #{inspect(body)}"
)
{_, response} = HTTPoison.post(workflow_notification_url, body, headers, options)
%{
"condition" => status_job <> "_" <> Integer.to_string(job.id),
"timestamp" =>
NaiveDateTime.utc_now()
|> NaiveDateTime.to_string()
|> NaiveDateTime.from_iso8601!()
|> NaiveDateTime.truncate(:second),
"response" => response.status_code
}
end
def retrieve_status(
notification_endpoint,
conditions,
status,
workflow_notification_data,
job,
workflow,
exposed_domain_name
)
when status in ["error", "completed"] and job != nil do
cond do
Enum.member?(
conditions,
"job_completed"
) and status == "completed" ->
notification_endpoint
|> StepFlow.Map.get_by_key_or_atom("endpoint_url")
|> notify_endpoint_with_job_status(
job,
workflow,
exposed_domain_name,
"job_completed",
retrieve_credentials(workflow_notification_data, notification_endpoint)
)
Enum.member?(
conditions,
"job_error"
) and status == "error" ->
notification_endpoint
|> StepFlow.Map.get_by_key_or_atom("endpoint_url")
|> notify_endpoint_with_job_status(
job,
workflow,
exposed_domain_name,
"job_error",
retrieve_credentials(workflow_notification_data, notification_endpoint)
)
true ->
nil
end
end
def retrieve_status(
notification_endpoint,
conditions,
status,
workflow_notification_data,
# Workflow status check
nil,
workflow,
exposed_domain_name
)
when status in ["error", "completed"] do
cond do
Enum.member?(
conditions,
"workflow_completed"
) and status == "completed" ->
notification_endpoint
|> StepFlow.Map.get_by_key_or_atom("endpoint_url")
|> notify_endpoint_with_workflow_status(
workflow,
exposed_domain_name,
"workflow_completed",
retrieve_credentials(workflow_notification_data, notification_endpoint)
)
Enum.member?(
conditions,
"workflow_error"
) and status == "error" ->
notification_endpoint
|> StepFlow.Map.get_by_key_or_atom("endpoint_url")
|> notify_endpoint_with_workflow_status(
workflow,
exposed_domain_name,
"workflow_error",
retrieve_credentials(workflow_notification_data, notification_endpoint)
)
true ->
nil
end
end
def retrieve_status(
_notification_endpoint,
_conditions,
status,
_workflow_notification_data,
_job,
_workflow,
_exposed_domain_name
)
when status not in ["error", "completed"],
do: nil
def manage_notification_status(job_id, "job", job_status) do
job = Jobs.get_job!(job_id)
workflow = Workflows.get_workflow!(job.workflow_id)
exposed_domain_name = StepFlow.Configuration.get_exposed_domain_name()
workflow_notifications_with_status =
for workflow_notification_data <- workflow.notification_hooks do
conditions = StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "conditions")
last_status = get_last_status(workflow_notification_data)
endpoint_key = StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "endpoint")
case NotificationEndpoints.get_notification_endpoint_by_placeholder(endpoint_key) do
nil ->
Logger.error("Could not find an url for template endpoint key #{endpoint_key}")
notification_endpoint ->
new_status =
retrieve_status(
notification_endpoint,
conditions,
job_status,
workflow_notification_data,
job,
workflow,
exposed_domain_name
)
if new_status != nil do
Map.put(workflow_notification_data, "status", last_status ++ [new_status])
else
workflow_notification_data
end
end
end
{_, workflow} =
Workflows.update_workflow(workflow, %{
notification_hooks: workflow_notifications_with_status
})
workflow
end
def manage_notification_status(workflow_id, "workflow", workflow_status) do
workflow = Workflows.get_workflow!(workflow_id)
exposed_domain_name = StepFlow.Configuration.get_exposed_domain_name()
workflow_notifications_with_status =
for workflow_notification_data <- workflow.notification_hooks do
conditions = StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "conditions")
last_status = get_last_status(workflow_notification_data)
endpoint_key = StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "endpoint")
case NotificationEndpoints.get_notification_endpoint_by_placeholder(endpoint_key) do
nil ->
Logger.error("Could not find an url for template endpoint key #{endpoint_key}")
notification_endpoint ->
new_status =
retrieve_status(
notification_endpoint,
conditions,
workflow_status,
workflow_notification_data,
nil,
workflow,
exposed_domain_name
)
if new_status != nil do
Map.put(workflow_notification_data, "status", last_status ++ [new_status])
else
workflow_notification_data
end
end
end
{_, workflow} =
Workflows.update_workflow(workflow, %{
notification_hooks: workflow_notifications_with_status
})
workflow
end
defp retrieve_credentials(notification_data, notification_endpoint) do
notification_endpoint_credentials =
StepFlow.Map.get_by_key_or_atom(notification_endpoint, "endpoint_credentials")
cond do
!blank?(notification_endpoint_credentials) ->
notification_endpoint_credentials
!blank?(
notification_data_credentials =
StepFlow.Map.get_by_key_or_atom(notification_data, "credentials")
) ->
notification_data_credentials
true ->
nil
end
end
defp blank?(path),
do: "" == path |> to_string() |> String.trim()
end