defmodule OffBroadway.Splunk.Producer do
@moduledoc """
GenStage Producer for a Splunk Event Stream.
Broadway producer acts as a consumer for Splunk report or alerts.
## Producer Options
#{NimbleOptions.docs(OffBroadway.Splunk.Options.definition())}
## Acknowledgements
You can use the `on_success` and `on_failure` options to control how messages are
acknowledged. You can set these options when starting the Splunk producer or change
them for each message through `Broadway.Message.configure_ack/2`. By default, successful
messages are acked (`:ack`) and failed messages are not (`:noop`).
The possible values for `:on_success` and `:on_failure` are:
* `:ack` - acknowledge the message. Splunk does not have any concept of acking messages,
because we are just consuming messages from a web api endpoint.
For now we are just executing a `:telemetry` event for acked messages.
* `:noop` - do not acknowledge the message. No action are taken.
## Telemetry
This library exposes the following telemetry events:
* `[:off_broadway_splunk, :receive_jobs, :start]` - Dispatched before fetching jobs
from Splunk.
* measurement: `%{time: System.monotonic_time}`
* metadata: `%{name: string, count: integer}`
* `[:off_broadway_splunk, :receive_jobs, :stop]` - Dispatched when fetching jobs from Splunk
is complete.
* measurement: `%{time: native_time}`
* metadata: `%{name: string, count: integer}`
* `[:off_broadway_splunk, :receive_jobs, :exception]` - Dispatched after a failure while fetching
jobs from Splunk.
* measurement: `%{duration: native_time}`
* metadata:
```
%{
name: string,
reason: reason,
stacktrace: stacktrace
}
```
* `[:off_broadway_splunk, :process_job, :start]` - Dispatched before starting to process
messages for a job.
* measurement: `%{time: System.system_time}`
* metadata: `%{name: string, sid: string}`
* `[:off_broadway_splunk, :process_job, :stop]` - Dispatched after all messages have been
processed for a job.
* measurement: `%{time: System.system_time, processed_events: integer, processed_requests: integer}`
* metadata: `%{name: string, sid: string}`
* `[:off_broadway_splunk, :receive_messages, :start]` - Dispatched before receiving
messages from Splunk.
* measurement: `%{time: System.monotonic_time}`
* metadata: `%{name: string, sid: string, demand: integer}`
* `[:off_broadway_splunk, :receive_messages, :stop]` - Dispatched after messages have been
received from Splunk and "wrapped".
* measurement: `%{time: native_time}`
* metadata:
```
%{
name: string,
sid: string,
received: integer,
demand: integer
}
```
* `[:off_broadway_splunk, :receive_messages, :exception]` - Dispatched after a failure while
receiving messages from Splunk.
* measurement: `%{duration: native_time}`
* metadata:
```
%{
name: string,
sid: string,
demand: integer,
reason: reason,
stacktrace: stacktrace
}
```
* `[:off_broadway_splunk, :receive_messages, :ack]` - Dispatched when acking a message.
* measurement: `%{time: System.system_time, count: 1}`
* meatadata:
```
%{
name: string,
receipt: receipt
}
```
"""
use GenStage
alias Broadway.Producer
alias NimbleOptions.ValidationError
alias OffBroadway.Splunk.Job
@behaviour Producer
@impl true
def init(opts) do
client = opts[:splunk_client]
{:ok, client_opts} = client.init(opts)
# Use a two-dimensional counter to keep track of counts.
# First index is count for current job, second is total.
processed_events_counter = :counters.new(2, [:atomics])
processed_requests_counter = :counters.new(2, [:atomics])
{:producer,
%{
demand: 0,
drain: false,
processed_events: processed_events_counter,
processed_requests: processed_requests_counter,
receive_timer: nil,
receive_interval: opts[:receive_interval],
refetch_timer: nil,
refetch_interval: opts[:refetch_interval],
name: opts[:name],
current_job: nil,
completed_jobs: MapSet.new(),
queue: :queue.new(),
splunk_client: {client, client_opts},
broadway: opts[:broadway][:name],
only_new: opts[:only_new],
only_latest: opts[:only_latest],
shutdown_timeout: opts[:shutdown_timeout]
}}
end
@impl true
def prepare_for_start(_module, broadway_opts) do
{producer_module, client_opts} = broadway_opts[:producer][:module]
case NimbleOptions.validate(client_opts, OffBroadway.Splunk.Options.definition()) do
{:error, error} ->
raise ArgumentError, format_error(error)
{:ok, opts} ->
ack_ref = broadway_opts[:name]
:persistent_term.put(ack_ref, %{
name: opts[:name],
config: opts[:config],
on_success: opts[:on_success],
on_failure: opts[:on_failure]
})
with_default_opts = put_in(broadway_opts, [:producer, :module], {producer_module, opts})
{[], with_default_opts}
end
end
defp format_error(%ValidationError{keys_path: [], message: message}) do
"invalid configuration given to OffBroadway.Splunk.Producer.prepare_for_start/2, " <>
message
end
defp format_error(%ValidationError{keys_path: keys_path, message: message}) do
"invalid configuration given to OffBroadway.Splunk.Producer.prepare_for_start/2 for key #{inspect(keys_path)}, " <>
message
end
@impl true
def handle_demand(incoming_demand, %{demand: demand, receive_timer: timer} = state) do
timer && Process.cancel_timer(timer)
handle_receive_messages(%{state | demand: demand + incoming_demand, receive_timer: nil})
end
@impl true
def handle_info(:receive_jobs, %{refetch_timer: timer} = state) do
timer && Process.cancel_timer(timer)
handle_receive_jobs(%{state | refetch_timer: nil})
end
def handle_info(:receive_messages, %{receive_timer: timer} = state) do
timer && Process.cancel_timer(timer)
handle_receive_messages(%{state | receive_timer: nil})
end
def handle_info(:next_job, %{receive_timer: timer} = state) do
timer && Process.cancel_timer(timer)
handle_next_job(%{state | receive_timer: nil})
end
def handle_info(
:shutdown_broadway,
%{
receive_timer: receive_timer,
refetch_timer: refetch_timer,
shutdown_timeout: timeout,
broadway: broadway
} = state
) do
receive_timer && Process.cancel_timer(receive_timer)
refetch_timer && Process.cancel_timer(refetch_timer)
Broadway.stop(broadway, :normal, timeout)
{:noreply, [], %{state | receive_timer: nil}}
end
def handle_info(_, state), do: {:noreply, [], state}
@impl Producer
def prepare_for_draining(%{receive_timer: receive_timer, refetch_timer: refetch_timer} = state) do
receive_timer && Process.cancel_timer(receive_timer)
refetch_timer && Process.cancel_timer(refetch_timer)
{:noreply, [], %{state | drain: true, receive_timer: nil, refetch_timer: nil}}
end
@spec handle_receive_jobs(state :: map()) :: {:noreply, [], new_state :: map()}
defp handle_receive_jobs(%{refetch_timer: nil} = state) do
new_state =
receive_jobs_from_splunk(state)
|> update_queue_from_response(state)
case new_state do
%{current_job: nil, queue: {[], []}} ->
{:noreply, [],
%{new_state | refetch_timer: schedule_receive_jobs(state.refetch_interval)}}
%{current_job: nil, queue: _queue} ->
{:noreply, [],
%{
new_state
| receive_timer: schedule_next_job(0),
refetch_timer: schedule_receive_jobs(state.refetch_interval)
}}
end
end
defp handle_next_job(
%{current_job: current, completed_jobs: completed, receive_timer: nil} = state
) do
unless is_nil(current) do
:telemetry.execute(
[:off_broadway_splunk, :process_job, :stop],
%{
time: System.system_time(),
processed_events: :counters.get(state.processed_events, 1),
processed_requests: :counters.get(state.processed_requests, 1)
},
%{name: state.name, sid: current.name}
)
end
case :queue.out(state.queue) do
{{:value, job}, new_queue} ->
:ok = :counters.put(state.processed_events, 1, 0)
:ok = :counters.put(state.processed_requests, 1, 0)
:telemetry.execute(
[:off_broadway_splunk, :process_job, :start],
%{time: System.system_time()},
%{name: state.name, sid: job.name}
)
{:noreply, [],
%{
state
| current_job: job,
queue: new_queue,
completed_jobs: MapSet.put(completed, current),
receive_timer: schedule_receive_messages(0)
}}
{:empty, new_queue} ->
{:noreply, [],
%{
state
| current_job: nil,
queue: new_queue,
completed_jobs: MapSet.put(completed, current),
refetch_timer: schedule_receive_jobs(state.refetch_interval)
}}
end
end
defp handle_receive_messages(%{drain: true} = state), do: {:noreply, [], state}
defp handle_receive_messages(
%{
current_job: nil,
refetch_timer: nil,
receive_timer: nil,
queue: {[], []}
} = state
) do
with {:ok, response} <- receive_jobs_from_splunk(state),
new_state <- update_queue_from_response({:ok, response}, state) do
{:noreply, [], %{new_state | receive_timer: schedule_receive_messages(0)}}
else
{:error, _reason} ->
{:noreply, [],
%{state | receive_timer: schedule_receive_messages(state.receive_interval)}}
end
end
defp handle_receive_messages(
%{
receive_timer: nil,
current_job: %Job{},
demand: demand,
splunk_client: {_, client_opts}
} = state
)
when demand > 0 do
{messages, new_state} = receive_messages_from_splunk(state, demand)
new_demand = demand - length(messages)
max_events = client_opts[:max_events]
total_events = :counters.get(state.processed_events, 2)
receive_timer =
case {total_events, messages, new_state} do
{^max_events, _messages, _state} -> schedule_shutdown()
{_total_events, [], %{receive_interval: interval}} -> schedule_next_job(interval)
{_total_events, _messages, _state} -> schedule_receive_messages(0)
end
{:noreply, messages, %{new_state | demand: new_demand, receive_timer: receive_timer}}
end
defp handle_receive_messages(%{current_job: nil, receive_timer: nil} = state) do
case :queue.peek(state.queue) do
{:value, _} ->
{:noreply, [], %{state | receive_timer: schedule_next_job(0)}}
:empty ->
{:noreply, [],
%{state | receive_timer: schedule_receive_messages(state.receive_interval)}}
end
end
defp handle_receive_messages(%{receive_timer: nil} = state) do
{:noreply, [], %{state | receive_timer: schedule_receive_messages(state.receive_interval)}}
end
@spec receive_jobs_from_splunk(state :: map()) :: {:ok, Tesla.Env.t()}
defp receive_jobs_from_splunk(%{name: name, splunk_client: {client, client_opts}}) do
metadata = %{name: name, count: 0}
:telemetry.span(
[:off_broadway_splunk, :receive_jobs],
metadata,
fn ->
case client.receive_status(name, client_opts) do
{:ok, %{status: 200, body: %{"entry" => jobs}}} = response ->
{response, %{metadata | count: length(jobs)}}
{:ok, %{status: _status}} = response ->
{response, metadata}
{:error, reason} ->
{{:error, reason}, metadata}
end
end
)
end
@spec receive_messages_from_splunk(state :: map(), demand :: non_neg_integer()) ::
{messages :: list(), state :: map()}
defp receive_messages_from_splunk(
%{name: name, current_job: job, splunk_client: {client, client_opts}} = state,
demand
) do
metadata = %{name: name, sid: job.name, demand: demand}
count = calculate_count(client_opts, demand, :counters.get(state.processed_events, 1))
client_opts =
Keyword.put(client_opts, :ack_ref, state.broadway)
|> Keyword.put(:query,
output_mode: "json",
count: count,
offset: :counters.get(state.processed_events, 1)
)
case count do
0 ->
{[], state}
_ ->
messages =
:telemetry.span(
[:off_broadway_splunk, :receive_messages],
metadata,
fn ->
with messages <- client.receive_messages(job.name, demand, client_opts),
count <- length(messages),
:ok <- :counters.add(state.processed_events, 1, count),
:ok <- :counters.add(state.processed_events, 2, count),
:ok <- :counters.add(state.processed_requests, 1, 1),
:ok <- :counters.add(state.processed_requests, 2, 1) do
{messages, Map.put(metadata, :received, count)}
end
end
)
{messages, state}
end
end
@spec update_queue_from_response(
response :: {:ok, Tesla.Env.t()} | {:error, any()},
state :: map()
) ::
new_state :: map()
defp update_queue_from_response({:ok, %{status: 200, body: %{"entry" => jobs}}}, state) do
jobs =
Enum.map(jobs, &merge_non_nil_fields(Job.new(&1), Job.new(Map.get(&1, "content"))))
|> Enum.reject(& &1.is_zombie)
|> Enum.filter(& &1.is_done)
|> Enum.sort_by(& &1.published, {:asc, DateTime})
# This flag can only be true *once*, on the first fetch.
# If set, add all current jobs to "completed jobs", and set the flag false
# so it will never trigger again.
completed_jobs =
if state.only_new do
Enum.reduce(jobs, state.completed_jobs, fn job, acc ->
MapSet.put(acc, job)
end)
else
state.completed_jobs
end
new_queue =
:queue.fold(
fn job, acc ->
with false <- job == state.current_job,
false <- :queue.member(job, acc),
false <- MapSet.member?(completed_jobs, job) do
:queue.in(job, acc)
else
true -> acc
end
end,
state.queue,
:queue.from_list(only_latest?(jobs, state.only_latest))
)
%{state | queue: new_queue, completed_jobs: completed_jobs, only_new: false}
end
defp update_queue_from_response({:ok, _response}, state), do: state
defp update_queue_from_response({:error, _reason}, state), do: state
@spec only_latest?(list :: list(), flag :: boolean()) :: list()
defp only_latest?(list, true), do: Enum.take(list, -1)
defp only_latest?(list, false), do: list
@spec merge_non_nil_fields(map_a :: map(), map_b :: map()) :: map()
defp merge_non_nil_fields(map_a, map_b) do
Map.merge(map_a, map_b, fn
_key, old_value, new_value when is_nil(new_value) -> old_value
_key, _old_value, new_value -> new_value
end)
end
@spec calculate_count(
client_opts :: map(),
demand :: non_neg_integer(),
processed_events :: non_neg_integer()
) :: non_neg_integer()
defp calculate_count(client_opts, demand, processed_events) do
case client_opts[:max_events] do
nil ->
demand
max_events ->
capacity = max_events - processed_events
min(demand - (demand - capacity), demand)
end
end
@spec schedule_next_job(interval :: non_neg_integer()) :: reference()
defp schedule_next_job(interval),
do: Process.send_after(self(), :next_job, interval)
@spec schedule_receive_jobs(interval :: non_neg_integer()) :: reference()
defp schedule_receive_jobs(interval),
do: Process.send_after(self(), :receive_jobs, interval)
@spec schedule_receive_messages(interval :: non_neg_integer()) :: reference()
defp schedule_receive_messages(interval),
do: Process.send_after(self(), :receive_messages, interval)
@spec schedule_shutdown() :: reference()
defp schedule_shutdown,
do: Process.send_after(self(), :shutdown_broadway, 0)
end