defmodule StepFlow.Amqp.CommonEmitter do
@moduledoc """
A common emitter to send job orders to workers.
"""
require Logger
alias StepFlow.Amqp.Connection
alias StepFlow.Amqp.Helpers
alias StepFlow.Metrics.JobInstrumenter
@doc """
Publish a message.
Example:
```elixir
StepFlow.Amqp.CommonEmitter.publish_json("my_rabbit_mq_queue", "{\\\"key\\\": \\\"value\\\"}")
```
"""
def publish(queue, message, options \\ [], exchange \\ "job_submit") do
options =
options
|> Helpers.get_amqp_message_options()
Connection.publish(queue, message, options, exchange)
end
@doc """
Publish a message using JSON serialization before send it.
Example:
```elixir
StepFlow.Amqp.CommonEmitter.publish_json("my_rabbit_mq_queue", 0, %{key: "value"})
```
"""
def publish_json(queue, priority, message, exchange \\ "job_submit", options \\ []) do
message =
message
|> check_message_parameters
|> Jason.encode!()
options = options ++ [priority: min(priority, 100)]
JobInstrumenter.inc(:step_flow_jobs_status_total, queue, "created")
publish(queue, message, options, exchange)
end
defp check_message_parameters(message) do
parameters =
message
|> StepFlow.Map.get_by_key_or_atom(:parameters, [])
|> Enum.filter(fn param ->
StepFlow.Map.get_by_key_or_atom(param, :type) != "filter"
end)
StepFlow.Map.replace_by_atom(message, :parameters, parameters)
end
end