defmodule StepFlow.Amqp.Helpers do
require Logger
@moduledoc """
Helpers for AMQP.
"""
@doc """
Get AMQP URL from the configuration or environment variables.
- `hostname` Setup the hostname of the RabbitMQ service
- `username` Setup the username of the RabbitMQ service
- `password` Setup the password of the RabbitMQ service
- `port` Setup the port of the RabbitMQ service
- `virtual_host` Setup the virtual host of the RabbitMQ service
Hardcoded example:
config :step_flow, StepFlow.Amqp,
hostname: "example.com",
port: "5678",
username: "mediacloudai",
password: "mediacloudai",
virtual_host: "media_cloud_ai_dev"
Environment getter example:
config :step_flow, StepFlow.Amqp,
hostname: {:system, "AMQP_HOSTNAME"},
port: {:system, "AMQP_PORT"},
username: {:system, "AMQP_USERNAME"},
password: {:system, "AMQP_PASSWORD"},
virtual_host: {:system, "AMQP_VIRTUAL_HOST"},
"""
def get_amqp_connection_url do
scheme = StepFlow.Configuration.get_var_value(StepFlow.Amqp, :scheme, "amqp")
hostname = StepFlow.Configuration.get_var_value(StepFlow.Amqp, :hostname)
username = StepFlow.Configuration.get_var_value(StepFlow.Amqp, :username)
password = StepFlow.Configuration.get_var_value(StepFlow.Amqp, :password)
virtual_host = get_amqp_virtual_host()
port = get_amqp_port()
Logger.info("#{__MODULE__}: Connecting with hostname: #{hostname}")
url =
scheme <>
"://" <> username <> ":" <> password <> "@" <> hostname <> ":" <> port <> virtual_host
Logger.info("#{__MODULE__}: Connecting with url: #{url}")
url
end
defp convert_to_integer(value) do
case value do
v when is_bitstring(v) -> String.to_integer(value)
v when is_integer(v) -> value
end
end
defp convert_to_atom(value) do
case value do
v when is_bitstring(v) -> String.to_existing_atom(value)
v when is_atom(v) -> value
end
end
defp check_file_exists(path) do
case File.exists?(path) do
true -> path
false -> raise "Such a file does not exist: #{path}"
end
end
defp add_to_options(options, key, variable, update_function) do
case StepFlow.Configuration.get_var_value(StepFlow.Amqp, variable) do
nil ->
options
"" ->
options
value ->
options
|> Keyword.put(key, value)
|> Keyword.update(key, nil, fn v -> update_function.(v) end)
end
end
defp get_amqp_connection_ssl_options do
[]
|> add_to_options(:cacertfile, :ssl_cacertfile, &check_file_exists/1)
|> add_to_options(:certfile, :ssl_certfile, &check_file_exists/1)
|> add_to_options(:keyfile, :ssl_keyfile, &check_file_exists/1)
|> add_to_options(:depth, :ssl_depth, &convert_to_integer/1)
|> add_to_options(:verify, :ssl_verify, &convert_to_atom/1)
|> add_to_options(:fail_if_no_peer_cert, :ssl_fail_if_no_peer_cert, &convert_to_atom/1)
|> Enum.filter(fn {_key, value} -> !is_nil(value) end)
end
def get_amqp_connection_options do
options =
case get_amqp_connection_ssl_options() do
[] ->
[]
nil ->
[]
ssl_options ->
[]
|> Keyword.put(:ssl_options, ssl_options)
end
Logger.info("#{__MODULE__}: Connecting with options: #{inspect(options)}")
options
end
defp get_amqp_port do
StepFlow.Configuration.get_var_value(StepFlow.Amqp, :port, 5672)
|> StepFlow.Configuration.to_string()
end
defp get_amqp_virtual_host do
StepFlow.Configuration.get_var_value(StepFlow.Amqp, :virtual_host, "")
|> case do
"" -> ""
virtual_host -> "/" <> virtual_host
end
end
def get_amqp_message_options(options \\ []) do
options
|> Keyword.put(:persistent, get_amqp_delivery_mode() == 2)
end
def get_amqp_delivery_mode do
StepFlow.Configuration.get_var_value(StepFlow.Amqp, :delivery_mode, "1")
|> convert_to_integer
end
def get_amqp_server_configuration do
StepFlow.Configuration.get_var_value(StepFlow.Amqp, :server_configuration, "standalone")
end
end