defmodule StepFlow.NotificationHooks.NotificationHookManager do
@moduledoc """
Notification manager context.
"""
alias StepFlow.Jobs
alias StepFlow.NotificationHooks.NotificationEndpoints
alias StepFlow.NotificationHooks.NotificationTemplates
alias StepFlow.Workflows
require Logger
def get_last_status(workflow_notification_data) do
if StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "status") == nil do
[]
else
StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "status")
end
end
def notification_slack_from_job(job, channel, exposed_domain_name, description) do
send(
:step_flow_slack_bot,
{:message,
"Error for job #{job.name} ##{job.id} <#{exposed_domain_name}/workflows/#{job.workflow_id} |Open Workflow>\n```#{description}```",
channel}
)
end
def notification_teams_from_job(job, url, workflow, exposed_domain_name, description) do
headers = [{"content-type", "application/json"}]
body =
Poison.encode!(%{
"@context": "https://schema.org/extensions",
"@type": "MessageCard",
potentialAction: [
%{
"@type": "OpenUri",
name: "View Workflow",
targets: [
%{
os: "default",
uri: "#{exposed_domain_name}/workflows/#{job.workflow_id}"
}
]
}
],
sections: [
%{
facts: [
%{
name: "Posted By:",
value: "#{exposed_domain_name}"
},
%{
name: "Workflow Reference:",
value: "#{workflow.reference}"
},
%{
name: "Workflow ID:",
value: "#{workflow.id}"
},
%{
name: "Workflow Identifier:",
value: "#{workflow.identifier}"
},
%{
name: "Job ID:",
value: "#{job.id}"
},
%{
name: "Job:",
value: "#{job.name}"
}
],
text: "```#{description}```"
}
],
summary: "MCAI Workflow Error Notification",
themeColor: "FF5733",
title: "MCAI Workflow Error Notification"
})
Logger.debug(
"#{__MODULE__}: POST #{url}, headers: #{inspect(headers)}, body: #{inspect(body)}"
)
case HTTPoison.post(url, body, headers) do
{:ok, %{body: body, headers: _, request: _, request_url: _, status_code: 200}} ->
{:ok, body}
{:ok, response} ->
Logger.error("Unable to notify: #{inspect(response)}")
{:error, error} ->
Logger.error("Unable to notify: #{inspect(error)}")
end
end
def notification_from_job(job_id, description \\ nil) do
job = Jobs.get_job!(job_id)
topic = "update_workflow_" <> Integer.to_string(job.workflow_id)
channel = StepFlow.Configuration.get_slack_channel()
workflow = Workflows.get_workflow!(job.workflow_id)
exposed_domain_name = StepFlow.Configuration.get_exposed_domain_name()
slack_token = StepFlow.Configuration.get_slack_token()
if !(blank?(slack_token) or blank?(description) or blank?(channel)) do
notification_slack_from_job(job, channel, exposed_domain_name, description)
end
teams_url = StepFlow.Configuration.get_teams_url()
if !(blank?(teams_url) or blank?(description)) do
notification_teams_from_job(job, teams_url, workflow, exposed_domain_name, description)
end
StepFlow.Notification.send(topic, %{workflow_id: job.workflow_id})
end
def notify_endpoint_with_status(
workflow_notification_url,
job,
workflow,
exposed_domain_name,
status,
notification_template_name,
workflow_endpoint_credentials
)
when is_binary(notification_template_name) do
case NotificationTemplates.get_notification_template_by_name(notification_template_name) do
nil ->
Logger.error("#{__MODULE__}: There is no notification template under that name")
{:error, "There is no notification template under that name."}
notification_template ->
notify_endpoint_with_status(
workflow_notification_url,
job,
workflow,
exposed_domain_name,
status,
notification_template,
workflow_endpoint_credentials
)
end
end
def notify_endpoint_with_status(
workflow_notification_url,
job,
workflow,
exposed_domain_name,
status,
notification_template,
workflow_endpoint_credentials
) do
case JSON.decode(notification_template.template_headers) do
{:error, err} ->
Logger.error("#{__MODULE__}: #{inspect(err)}")
{:error, "Couldn't parse the provided template headers."}
{:ok, json_headers} ->
fill_notification_template_body(
workflow_notification_url,
job,
workflow,
exposed_domain_name,
json_headers
|> Enum.map(fn elem -> Map.to_list(elem) end)
|> Enum.concat(),
status,
notification_template,
workflow_endpoint_credentials
)
end
end
defp fill_notification_template_body(
workflow_notification_url,
job,
workflow,
exposed_domain_name,
headers,
status,
notification_template,
workflow_endpoint_credentials
) do
accepted_parameters = %{
# Replacement for ternary operator
"exposed_domain_name" => (is_nil(exposed_domain_name) && "") || exposed_domain_name,
"workflow.id" => Integer.to_string(workflow.id),
"workflow.reference" => workflow.reference,
"workflow.identifier" => workflow.identifier,
"job.name" => (is_nil(job) && "") || job.name,
"job.id" => (is_nil(job) && "") || Integer.to_string(job.id),
"status" => status
}
case intern_template_process(notification_template.template_body, accepted_parameters) do
{:error, message} ->
{:error, message}
{:ok, template} ->
send_notification(
workflow_notification_url,
template,
headers,
job,
accepted_parameters,
workflow_endpoint_credentials
)
end
end
defp send_notification(
workflow_notification_url,
template_body_string,
headers,
job,
accepted_parameters,
workflow_endpoint_credentials
) do
body =
case JSON.decode(template_body_string) do
{:error, err} ->
Logger.error("#{__MODULE__}: #{inspect(err)}")
{:error, "Couldn't parse the provided template body."}
{:ok, json_body} ->
json_body
|> JSON.encode!()
end
options =
if workflow_endpoint_credentials != nil do
[username, password] = String.split(workflow_endpoint_credentials, ":", trim: true)
[hackney: [basic_auth: {username, password}]]
else
[]
end
Logger.debug(
"#{__MODULE__}: POST #{workflow_notification_url}, headers: #{inspect(headers)}, body: #{inspect(body)}"
)
case HTTPoison.post(workflow_notification_url, body, headers, options) do
{:error, err} ->
Logger.error("#{__MODULE__}: #{inspect(err)}")
{:error, "Couldn't post the notification with the provided notification template."}
{:ok, response} ->
if Map.has_key?(response, :status_code) do
condition =
if job do
"job_" <>
accepted_parameters["status"] <>
"_" <> Integer.to_string(job.id)
else
"workflow_" <> accepted_parameters["status"]
end
{
:ok,
%{
"condition" => condition,
"timestamp" =>
NaiveDateTime.utc_now()
|> NaiveDateTime.to_string()
|> NaiveDateTime.from_iso8601!()
|> NaiveDateTime.truncate(:second),
"response" => response.status_code
}
}
else
{:error, "The associated URL didn't reply when queried."}
end
end
end
def retrieve_status(
notification_endpoint,
_conditions,
_conditions_with_templates,
_status,
_workflow_notification_data,
_job,
_workflow,
_exposed_domain_name
)
when is_nil(notification_endpoint) do
Logger.error("#{__MODULE__}: no endpoints were defined.")
nil
end
def retrieve_status(
notification_endpoint,
conditions,
conditions_with_templates,
status,
workflow_notification_data,
job,
workflow,
exposed_domain_name
)
when status in ["error", "completed"] and job != nil do
cond do
Enum.member?(
conditions,
"job_completed"
) and status == "completed" ->
notify_endpoint(
notification_endpoint,
job,
workflow,
exposed_domain_name,
status,
conditions_with_templates["job_completed"],
retrieve_credentials(workflow_notification_data, notification_endpoint)
)
Enum.member?(
conditions,
"job_error"
) and status == "error" ->
notify_endpoint(
notification_endpoint,
job,
workflow,
exposed_domain_name,
status,
conditions_with_templates["job_error"],
retrieve_credentials(workflow_notification_data, notification_endpoint)
)
true ->
nil
end
end
def retrieve_status(
notification_endpoint,
conditions,
conditions_with_templates,
status,
workflow_notification_data,
nil,
workflow,
exposed_domain_name
)
when status in ["error", "completed"] do
cond do
Enum.member?(
conditions,
"workflow_completed"
) and status == "completed" ->
notify_endpoint(
notification_endpoint,
nil,
workflow,
exposed_domain_name,
status,
conditions_with_templates["workflow_completed"],
retrieve_credentials(workflow_notification_data, notification_endpoint)
)
Enum.member?(
conditions,
"workflow_error"
) and status == "error" ->
notify_endpoint(
notification_endpoint,
nil,
workflow,
exposed_domain_name,
status,
conditions_with_templates["workflow_error"],
retrieve_credentials(workflow_notification_data, notification_endpoint)
)
true ->
nil
end
end
def retrieve_status(
_notification_endpoint,
_conditions,
_conditions_with_templates,
status,
_workflow_notification_data,
_job,
_workflow,
_exposed_domain_name
)
when status not in ["error", "completed"],
do: nil
defp notify_endpoint(
notification_endpoint,
job,
workflow,
exposed_domain_name,
status,
conditions_with_templates,
credentials
) do
case notification_endpoint
|> StepFlow.Map.get_by_key_or_atom("endpoint_url", "")
|> notify_endpoint_with_status(
job,
workflow,
exposed_domain_name,
status,
conditions_with_templates,
credentials
) do
{:error, message} ->
Logger.error("#{__MODULE__}: #{message}")
nil
{:ok, payload} ->
payload
end
end
def manage_notification_status(workflow_id, job, status) do
workflow = Workflows.get_workflow!(workflow_id)
exposed_domain_name = StepFlow.Configuration.get_exposed_domain_name()
workflow_notifications_with_status =
for workflow_notification_data <- workflow.notification_hooks do
conditions_with_templates =
StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "conditions")
last_status = get_last_status(workflow_notification_data)
endpoint_key = StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "endpoint")
case {NotificationEndpoints.get_notification_endpoint_by_placeholder(endpoint_key),
is_map(conditions_with_templates)} do
{notification_endpoint, true} ->
conditions =
conditions_with_templates
|> Map.keys()
new_status =
retrieve_status(
notification_endpoint,
conditions,
conditions_with_templates,
status,
workflow_notification_data,
job,
workflow,
exposed_domain_name
)
if new_status != nil do
Map.put(workflow_notification_data, "status", last_status ++ [new_status])
else
workflow_notification_data
end
{nil, _} ->
Logger.error("Could not find an url for template endpoint key #{endpoint_key}")
{_, false} ->
Logger.error("Condition templates is not a map!")
end
end
{_, workflow} =
Workflows.update_workflow(workflow, %{
notification_hooks: workflow_notifications_with_status
})
workflow
end
defp retrieve_credentials(notification_data, notification_endpoint) do
notification_endpoint_credentials =
StepFlow.Map.get_by_key_or_atom(notification_endpoint, "endpoint_credentials")
cond do
!blank?(notification_endpoint_credentials) ->
notification_endpoint_credentials
!blank?(
notification_data_credentials =
StepFlow.Map.get_by_key_or_atom(notification_data, "credentials")
) ->
notification_data_credentials
true ->
nil
end
end
defp intern_template_process(template, accepted_parameters) do
accepted_parameters
|> Map.to_list()
|> replace(template)
|> check_remaining_to_template()
end
defp replace([], template), do: template
defp replace([{key, value} | keys], template) do
template =
String.replace(
template,
"#" <> key <> "#",
value
)
replace(keys, template)
end
defp check_remaining_to_template(template) do
case Regex.scan(~r/#(?:[[:word:]][._()-]?)*#/, template) do
[] ->
{:ok, template}
array ->
message = "Following parameters are not availables : #{inspect(array)}.
Please check the documentation for a list of available parameters."
Logger.error(message)
{:error, message}
end
end
defp blank?(path),
do: "" == path |> to_string() |> String.trim()
end