defmodule Glific.State do
@moduledoc """
Manage simulator and flows, managing state and allocation to ensure we can have multiple simulators
and flow run at the same time
"""
use Publicist
use GenServer
require Logger
import Ecto.Query, warn: false
alias Glific.{
Communications,
State.Flow,
State.Simulator,
Users.User
}
# lets first define the genserver Server callbacks
@impl true
@doc false
@spec init(any) :: {:ok, %{}}
def init(_opts) do
# our state is a map of organization ids to simulator contexts
{:ok, reset_state()}
end
@impl true
@doc false
def handle_call({:get_simulator, user}, _from, state) do
{contact, state} = Simulator.get_simulator(user, state)
{:reply, contact, state, :hibernate}
end
@impl true
@doc false
def handle_call({:release_simulator, user}, _from, state) do
state = release_entity(user, state, :simulators)
{:reply, nil, state, :hibernate}
end
@impl true
@doc false
def handle_call({:state, organization_id}, _from, state) do
{:reply, get_state(state, organization_id), state, :hibernate}
end
@impl true
@doc false
def handle_call(:reset, _from, _state) do
{:reply, :ok, reset_state(), :hibernate}
end
@impl true
@doc false
def handle_call({:get_flow, params}, _from, state) do
{flow, state} =
Flow.get_flow(
%{user: params.user, flow_id: params.flow_id, is_forced: params.is_forced},
state
)
{:reply, flow, state, :hibernate}
end
@impl true
@doc false
def handle_call({:release_flow, user}, _from, state) do
state = release_entity(user, state, :flows)
{:reply, nil, state, :hibernate}
end
# Note that we are specifically not implementing the handle_cast callback
# since it does not make sense for the purposes of this interface
# lets define the client interface
@doc false
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc false
def get_simulator(user) do
GenServer.call(__MODULE__, {:get_simulator, user})
end
@doc false
def release_simulator(user) do
GenServer.call(__MODULE__, {:release_simulator, user})
end
@doc false
def get_flow(user, flow_id, is_forced) do
GenServer.call(__MODULE__, {:get_flow, %{user: user, flow_id: flow_id, is_forced: is_forced}})
end
@doc false
def release_flow(user) do
GenServer.call(__MODULE__, {:release_flow, user})
end
@doc false
def state(organization_id) do
GenServer.call(__MODULE__, {:state, organization_id})
end
@doc false
def reset do
GenServer.call(__MODULE__, :reset)
end
@doc """
initializes the state for this organization
if not already present
"""
@spec get_state(map(), non_neg_integer) :: map()
def get_state(state, organization_id) do
if Map.has_key?(state, organization_id),
do: state[organization_id],
else: init_state(organization_id)
end
@spec init_state(non_neg_integer) :: map()
defp init_state(organization_id) do
%{}
|> Map.merge(Simulator.init_state(organization_id))
|> Map.merge(Flow.init_state(organization_id))
end
@spec reset_state :: map()
defp reset_state do
%{}
end
@doc """
Release the entity associated with this user id. It is possible
that there is no entity associated with this user
"""
@spec release_entity(User.t(), map(), atom()) :: map()
def release_entity(user, state, type) do
organization_id = user.organization_id
org_state =
get_state(state, organization_id)
|> free_entity(type, %{user: user, is_forced: false})
Map.put(state, organization_id, org_state)
end
@doc """
Free the entity after holding an entity period is over
"""
@spec free_entity(map(), atom(), map()) :: map()
def free_entity(
%{
flow: %{free: free_flows, busy: busy_flows}
} = state,
:flows,
%{user: user, is_forced: is_forced}
) do
{free, busy} =
do_free_entity(free_flows, busy_flows, %{
user: user,
is_forced: is_forced,
entity_type: :flows
})
update_state(state, :flow, free, busy)
end
def free_entity(
%{
simulator: %{free: free_simulators, busy: busy_simulators}
} = state,
:simulators,
%{user: user}
) do
{free, busy} =
do_free_entity(free_simulators, busy_simulators, %{
user: user,
is_forced: false,
entity_type: :simulators
})
update_state(state, :simulator, free, busy)
end
@doc false
@spec update_state(map(), atom(), list() | map(), map()) :: map()
def update_state(state, key, free, busy),
do: Map.put(state, key, %{free: free, busy: busy})
# we'll assign the simulator and flows for 10 minute intervals
@cache_time 10
@spec do_free_entity(map(), map(), map()) :: {map(), map()}
defp do_free_entity(free, busy, %{user: user, is_forced: true, entity_type: entity_type}) do
Enum.reduce(
busy,
{free, busy},
fn {{id, fingerprint}, {entity, _time}}, {free, busy} ->
Logger.info(
"Releasing entity: #{inspect(entity)} for user: #{user.name} of org_id: #{user.organization_id} by force."
)
publish_data(entity.organization_id, id, entity_type)
{
[entity | free],
Map.delete(busy, {id, fingerprint})
}
end
)
end
defp do_free_entity(free, busy, %{user: user, is_forced: _is_forced, entity_type: entity_type}) do
expiry_time = DateTime.utc_now() |> DateTime.add(-1 * @cache_time * 60, :second)
Enum.reduce(
busy,
{free, busy},
fn {{id, fingerprint}, {entity, time}}, {free, busy} ->
# when user already has entity flow assigned with same fingerprint
if (user && user.id == id && user.fingerprint == fingerprint) ||
DateTime.compare(time, expiry_time) == :lt do
Logger.info(
"Releasing entity: #{inspect(entity)} for user: #{user.name} of org_id: #{user.organization_id}."
)
publish_data(entity.organization_id, id, entity_type)
{
[entity | free],
Map.delete(busy, {id, fingerprint})
}
else
{free, busy}
end
end
)
end
# sending subscription when simulator is released
@spec publish_data(non_neg_integer, non_neg_integer, atom()) :: any() | nil
defp publish_data(organization_id, user_id, :simulators) do
Communications.publish_data(
%{"simulator_release" => %{user_id: user_id}},
:simulator_release,
organization_id
)
end
defp publish_data(_organization_id, _user_id, :flows), do: nil
end