defmodule ChoreRunner.Reporter do
use GenServer
require Logger
alias ChoreRunner.{Chore, ChoreSupervisor}
@process_dict_key :chore_reporter_pid
def init({opts, chore}) do
pubsub = Keyword.get(opts, :pubsub)
finished_function = Keyword.get(opts, :result_handler, & &1)
unless pubsub do
Logger.warn(":pubsub option not supplied to `ChoreRunner.Reporter`")
end
send(self(), :broadcast)
{:ok,
%{
chore: Map.put(chore, :reporter, self()),
last_sent_chore: chore,
pubsub: pubsub,
finished_function: finished_function
}}
end
def start_link(init_opts, opts) do
merged_opts = Keyword.merge(init_opts, opts)
chore = Keyword.fetch!(merged_opts, :chore)
GenServer.start_link(__MODULE__, {merged_opts, chore}, name: name(chore))
end
defp report_started,
do: GenServer.cast(get_reporter_pid(), {:chore_started, DateTime.utc_now()})
defp report_finished,
do: GenServer.cast(get_reporter_pid(), {:chore_finished, DateTime.utc_now()})
def report_failed(reason),
do: GenServer.cast(get_reporter_pid(), {:chore_failed, reason, DateTime.utc_now()})
def log(message),
do: GenServer.cast(get_reporter_pid(), {:log, message, DateTime.utc_now()})
def set_counter(name, value), do: do_update_counter(name, value, :set)
def inc_counter(name, amount), do: do_update_counter(name, amount, :inc)
defp do_update_counter(name, value, operation),
do: GenServer.cast(get_reporter_pid(), {:update_counter, name, value, operation})
def get_chore_state(%Chore{} = chore) do
GenServer.call(name(chore), :chore_state)
end
def handle_call(:chore_state, _from, %{chore: chore} = state), do: {:reply, chore, state}
def handle_call(
{:start_chore_task, input, _opts},
_from,
%{chore: %Chore{mod: chore_mod, task: nil} = chore} = state
) do
reporter_pid = self()
task =
Task.Supervisor.async_nolink(ChoreSupervisor, fn ->
put_reporter_pid_in_process(reporter_pid)
lock_arg =
case chore_mod.restriction do
:none -> :none
:self -> chore_mod
:global -> :global
end
if try_lock(lock_arg) do
report_started()
chore_mod.run(input)
report_finished()
else
report_failed("Failed to acquire lock")
end
end)
new_chore = %Chore{chore | task: task}
{:reply, new_chore, %{state | chore: new_chore}}
end
def handle_call(:stop_chore, _from, state) do
ChoreSupervisor
|> Task.Supervisor.terminate_child(state.chore.task.pid)
|> case do
:ok ->
new_state =
%{state | chore: put_log(state.chore, "Stopping Chore", DateTime.utc_now())}
|> fail_chore("Stopped", DateTime.utc_now())
state.finished_function.(new_state.chore)
Task.async(fn ->
Process.sleep(10)
DynamicSupervisor.terminate_child(ChoreRunner.ReporterSupervisor, self())
end)
{:reply, :ok, new_state}
_ ->
{:reply, :error, state}
end
end
def handle_cast({:chore_started, timestamp}, state) do
new_state = put_in(state.chore.started_at, timestamp)
broadcast(new_state.pubsub, new_state.chore, :chore_started)
{:noreply, new_state}
end
def handle_cast({:chore_finished, timestamp}, state) do
new_state = put_in(state.chore.finished_at, timestamp)
broadcast(new_state.pubsub, new_state.chore, :chore_finished)
state.finished_function.(state.chore)
{:noreply, new_state}
end
def handle_cast({:chore_failed, reason, timestamp}, state) do
new_state = fail_chore(state, reason, timestamp)
{:noreply, new_state}
end
def handle_cast({:log, message, timestamp}, state) do
{:noreply, %{state | chore: put_log(state.chore, message, timestamp)}}
end
def handle_cast({:update_counter, name, value, operation}, state) when is_number(value),
do: {:noreply, update_in(state.chore.values[name], &do_update_values(&1, value, operation))}
defp fail_chore(state, reason, timestamp) do
new_state = put_in(state.chore.finished_at, timestamp)
new_state = %{
new_state
| chore: put_log(new_state.chore, "Failed with reason: #{reason}", timestamp)
}
broadcast(new_state.pubsub, new_state.chore, :chore_failed)
new_state
end
def handle_info(:broadcast, %{pubsub: nil} = state), do: {:noreply, state}
def handle_info(:broadcast, %{chore: %{finished_at: finished_at}} = state)
when not is_nil(finished_at),
do: {:noreply, state}
def handle_info(:broadcast, %{chore: chore, last_sent_chore: last_sent_chore} = state) do
Process.send_after(self(), :broadcast, 10)
if chore == last_sent_chore do
{:noreply, state}
else
broadcast(state.pubsub, diff_chore(last_sent_chore, chore), :chore_update)
{:noreply, %{state | last_sent_chore: chore}}
end
end
def handle_info({ref, result}, %{chore: %{task: %{ref: ref}}} = state),
do: {:noreply, put_in(state.chore.result, result)}
def handle_info({:DOWN, ref, _, _, _}, %{chore: %{task: %{ref: ref}}} = state),
do: {:stop, :normal, state}
defp diff_chore(prev, current) do
%Chore{current | logs: current.logs -- prev.logs}
end
defp do_update_values(nil, value, operation), do: do_update_values(0, value, operation)
defp do_update_values(original, value, :inc), do: original + value
defp do_update_values(_original, value, :set), do: value
defp put_log(%Chore{logs: logs} = chore, log, timestamp),
do: %Chore{chore | logs: [{log, timestamp} | logs]}
defp put_reporter_pid_in_process(reporter_pid), do: Process.put(@process_dict_key, reporter_pid)
defp get_reporter_pid do
case Process.get(@process_dict_key) do
nil ->
:"$callers"
|> Process.get()
|> Enum.find(fn pid ->
Process.info(pid, :dictionary)[@process_dict_key]
end)
|> case do
nil -> raise "Attempted to call a chore reporting function outside of a chore"
pid -> pid
end
pid ->
pid
end
end
defp try_lock(:none), do: true
defp try_lock(lock_type), do: :global.set_lock(lock_id(lock_type), all_nodes(), 0)
defp lock_id(:global), do: {__MODULE__, self()}
defp lock_id(chore_mod), do: {chore_mod, self()}
defp all_nodes, do: [node() | Node.list()]
defp name(%Chore{id: id}), do: {:global, {__MODULE__, id}}
defp broadcast(nil, _, _), do: :noop
defp broadcast(pubsub, chore, key) do
Phoenix.PubSub.broadcast(
pubsub,
ChoreRunner.chore_pubsub_topic(:all),
{key, chore}
)
Phoenix.PubSub.broadcast(
pubsub,
ChoreRunner.chore_pubsub_topic(chore),
{key, chore}
)
end
end