defmodule SMPPEX.Session do
@moduledoc """
Module for implementing custom SMPP Session entities.
To implement an Session entitiy, one should implement several callbacks (`SMPPEX.Session` behaviour).
The most proper way to do it is to `use` `SMPPEX.Session`:
```
defmodule MySession do
use SMPPEX.Session
# ...Callback implementation
end
```
In this case all callbacks have reasonable defaults.
"""
alias :erlang, as: Erlang
alias __MODULE__, as: Session
alias SMPPEX.Pdu
alias SMPPEX.PduStorage
alias SMPPEX.RawPdu
alias SMPPEX.Session.AutoPduHandler
alias SMPPEX.Session.Defaults
alias SMPPEX.SMPPTimers
alias SMPPEX.TransportSession
require Logger
@behaviour TransportSession
defstruct [
:module,
:module_state,
:timers,
:pdus,
:auto_pdu_handler,
:response_limit,
# increment before use
:sequence_number,
:time,
:timer_resolution,
:tick_timer_ref
]
@default_call_timeout Defaults.default_call_timeout()
@type state :: term
@type request :: term
@type reason :: term
@type reply :: term
@type send_pdu_result :: TransportSession.send_pdu_result()
@type session :: pid
@type from :: TransportSession.from()
@doc """
Invoked when a session is started after a connection successfully established.
`args` argument is taken directly from `ESME.start_link` or `MC.start` call.
The return value should be either `{:ok, state}`, then the session will successfully start and the returned state will be later passed to the other callbacks, or `{:stop, reason}`, then the session will stop with the returned reason.
"""
@callback init(
socket :: TransportSession.socket(),
transport :: TransportSession.transport(),
args :: term
) ::
{:ok, state}
| {:stop, reason}
@doc """
Invoked when the session receives an incoming PDU (which is not a response PDU).
The callback return values indicate the following:
* `{:ok, state}` — use `state` as the new session state;
* `{:ok, pdus, state}` — use `state` as the new session state and additionally send `pdus` to the connection;
* `{:stop, reason, state}` — stop with reason `reason` and use `state` as the new session state.
"""
@callback handle_pdu(pdu :: Pdu.t(), state) ::
{:ok, state}
| {:ok, [Pdu.t()], state}
| {:stop, reason, state}
@doc """
Invoked when the session receives an incoming PDU which couldn't be correctly parsed.
The callback return values indicate the following:
* `{:ok, state}` — use `state` as the new session state;
* `{:ok, pdus, state}` — use `state` as the new session state and additionally send `pdus` to the connection;
* `{:stop, reason, state}` — stop with reason `reason` and use `state` as the new session state.
"""
@callback handle_unparsed_pdu(pdu :: RawPdu.t(), error :: term, state) ::
{:ok, state}
| {:ok, [Pdu.t()], state}
| {:stop, reason, state}
@doc """
Invoked when the session receives a response to a previously sent PDU.
`pdu` argument contains the received response PDU, `original_pdu` contains
the previously sent pdu for which the handled response is received.
The callback return values indicate the following:
* `{:ok, state}` — use `state` as the new session state;
* `{:ok, pdus, state}` — use `state` as the new session state and additionally send `pdus` to the connection;
* `{:stop, reason, state}` — stop with reason `reason` and use `state` as the new session state.
"""
@callback handle_resp(pdu :: Pdu.t(), original_pdu :: Pdu.t(), state) ::
{:ok, state}
| {:ok, [Pdu.t()], state}
| {:stop, reason, state}
@doc """
Invoked when the session does not receive a response to a previously sent PDU
within the specified timeout.
`pdu` argument contains the PDU for which no response was received. If the response
will be received later it will be dropped (with an `info` log message).
The callback return values indicate the following:
* `{:ok, state}` — use `state` as the new session state;
* `{:ok, pdus, state}` — use `state` as the new session state and additionally send `pdus` to the connection;
* `{:stop, reason, state}` — stop with reason `reason` and use `state` as the new session state.
"""
@callback handle_resp_timeout(pdus :: [Pdu.t()], state) ::
{:ok, state}
| {:ok, [Pdu.t()], state}
| {:stop, reason, state}
@doc """
Invoked when the session is about to be stopped due to a timeout.
The return value will be used as the exit reason of the session process. The default implementation returns `{:timers, reason}`.
"""
@callback handle_timeout(SMPPEX.SMPPTimers.stop_reason(), state) :: exit_reason :: term
@doc """
Invoked when the SMPP session successfully sent PDU to transport or failed to do this.
`pdu` argument contains the PDU for which send status is reported. `send_pdu_result` can be
either `:ok` or `{:error, reason}`.
The returned value is used as the new state.
"""
@callback handle_send_pdu_result(pdu :: Pdu.t(), send_pdu_result, state) :: state
@doc """
Invoked when the connection's socket reported a error.
The returned value should be `{reason, state}`. The session stops then with `reason`.
"""
@callback handle_socket_error(error :: term, state) :: {exit_reason :: term, state}
@doc """
Invoked when the connection is closed by the peer.
The returned value should be `{reason, state}`. The session stops then with `reason`.
"""
@callback handle_socket_closed(state) :: {exit_reason :: term, state}
@doc """
Invoked to handle an arbitrary syncronous `request` sent to the session with `Session.call/3` method.
`from` argument can be used to send a response asyncronously via `Session.reply/2`.
The returned values indicate the following:
* `{:reply, reply, state}` — reply with `reply` and use `state` as the new state;
* `{:reply, reply, pdus, state}` — reply with `reply`, use `state` as the new state and additionally send `pdus` to the peer;
* `{:noreply, state}` — do not reply and use `state` as the new state. The reply can be send later via `Session.reply`;
* `{:noreply, pdus, state}` — do not reply, use `state` as the new state and additionally send `pdus` to the peer. The reply can be send later via `Session.reply`;
* `{:stop, reason, reply, state}` — reply with `reply`, use `state` as the new state and exit with `reason`;
* `{:stop, reason, state}` — do not reply, use `state` as the new state and exit with `reason`.
"""
@callback handle_call(request, from, state) ::
{:reply, reply, state}
| {:reply, reply, [Pdu.t()], state}
| {:noreply, state}
| {:noreply, [Pdu.t()], state}
| {:stop, reason, reply, state}
| {:stop, reason, state}
@doc """
Invoked to handle an arbitrary asyncronous `request` sent to the session with `Session.cast/2` method.
The returned values indicate the following:
* `{:noreply, state}` — use `state` as the new state;
* `{:noreply, pdus, state}` — use `state` as the new state and additionally send `pdus` to the peer.;
* `{:stop, reason, state}` — use `state` as the new state and exit with `reason`.
"""
@callback handle_cast(request, state) ::
{:noreply, state}
| {:noreply, [Pdu.t()], state}
| {:stop, reason, state}
@doc """
Invoked to handle a generic message `request` sent to the session process.
The returned values indicate the following:
* `{:noreply, state}` — use `state` as the new state;
* `{:noreply, pdus, state}` — use `state` as the new state and additionally send `pdus` to the peer.;
* `{:stop, reason, state}` — use `state` as the new state and exit with `reason`.
"""
@callback handle_info(request, state) ::
{:noreply, state}
| {:noreply, [Pdu.t()], state}
| {:stop, reason, state}
@doc """
Invoked when the session process is about to exit.
`lost_pdus` contain a list of nonresp `pdus` sent by the session to the peer and which have not yet received a response.
The returned value is either `:stop` or `{:stop, last_pdus, state}`, where `last_pdus` is a list of PDUs which will be sent to the peer before socket close, and `state` is the new state. For example, an ESME can send an unbind PDU or an MC can send negative resps for pending `submit_sm`s if needed.
This callback is called from the underlying `GenServer` `terminate` callbacks, so it has all the corresponding caveats, for example, sometimes it may not be called, see [`GenServer.terminate/2` docs](https://hexdocs.pm/elixir/GenServer.html#c:terminate/2).
"""
@callback terminate(reason, lost_pdus :: [Pdu.t()], state) ::
:stop
| {:stop, [Pdu.t()], state}
@doc """
Invoked to change the state of the session when a different version of a module is loaded (hot code swapping) and the state’s term structure should be changed. The method has the same semantics as the original `GenServer.code_change/3` callback.
"""
@callback code_change(old_vsn :: term | {:down, term}, state, extra :: term) ::
{:ok, state}
| {:error, reason}
defmacro __using__(_) do
quote location: :keep do
@behaviour SMPPEX.Session
require Logger
@impl SMPPEX.Session
def init(_socket, _transport, args) do
{:ok, args}
end
@impl SMPPEX.Session
def handle_pdu(_pdu, state), do: {:ok, state}
@impl SMPPEX.Session
def handle_unparsed_pdu(_pdu, _error, state), do: {:ok, state}
@impl SMPPEX.Session
def handle_resp(_pdu, _original_pdu, state), do: {:ok, state}
@impl SMPPEX.Session
def handle_resp_timeout(_pdus, state), do: {:ok, state}
@impl SMPPEX.Session
def handle_timeout(reason, _state) do
Logger.info("Session #{inspect(self())}, being stopped by timers(#{reason})")
{:timers, reason}
end
@impl SMPPEX.Session
def handle_send_pdu_result(_pdu, _result, state), do: state
@impl SMPPEX.Session
def handle_socket_error(error, state), do: {{:socket_error, error}, state}
@impl SMPPEX.Session
def handle_socket_closed(state), do: {:socket_closed, state}
@impl SMPPEX.Session
def handle_call(_request, _from, state), do: {:reply, :ok, state}
@impl SMPPEX.Session
def handle_cast(_request, state), do: {:noreply, state}
@impl SMPPEX.Session
def handle_info(_request, state), do: {:noreply, state}
@impl SMPPEX.Session
def terminate(reason, lost_pdus, _state) do
Logger.info(
"Session #{inspect(self())} stopped with reason: #{inspect(reason)}, lost_pdus: #{inspect(lost_pdus)}"
)
:stop
end
@impl SMPPEX.Session
def code_change(_vsn, state, _extra), do: {:ok, state}
defoverridable init: 3,
handle_pdu: 2,
handle_unparsed_pdu: 3,
handle_resp: 3,
handle_resp_timeout: 2,
handle_timeout: 2,
handle_send_pdu_result: 3,
handle_socket_error: 2,
handle_socket_closed: 1,
handle_call: 3,
handle_cast: 2,
handle_info: 2,
terminate: 3,
code_change: 3
end
end
# Public interface
@spec send_pdu(session, Pdu.t()) :: :ok
@doc """
Sends a PDU from the session to the peer.
"""
def send_pdu(pid, pdu, timeout \\ @default_call_timeout) do
TransportSession.call(pid, {:send_pdu, pdu}, timeout)
end
@spec stop(session) :: :ok
@doc """
Stops the session syncronously.
"""
def stop(pid, reason \\ :normal) do
TransportSession.call(pid, {:stop, reason})
end
@spec call(session, request :: term, timeout) :: term
@doc """
Makes a syncronous call to the session.
The call is handled by `handle_call/3` `SMPPEX.Session` callback.
"""
def call(pid, request, timeout \\ @default_call_timeout) do
TransportSession.call(pid, {:call, request}, timeout)
end
@spec cast(session, request :: term) :: :ok
@doc """
Makes an asyncronous call to Session.
The call is handled by `handle_cast/2` `SMPPEX.Session` callback.
"""
def cast(pid, request) do
TransportSession.cast(pid, {:cast, request})
end
@spec reply(from, response :: term) :: :ok
@doc """
Replies to a client calling `Session.call` method.
This function can be used to explicitly send a reply to a client that called `call/3`.
`from` must be the `from` argument (the second argument) accepted by `handle_call/3` callbacks.
The return value is always `:ok`.
"""
def reply(from, response) do
TransportSession.reply(from, response)
end
# SMPP.TransportSession callbacks
@impl TransportSession
def init(socket, transport, [{module, args}, session_opts]) do
case module.init(socket, transport, args) do
{:ok, state} ->
timer_resolution =
Keyword.get(session_opts, :timer_resolution, Defaults.timer_resolution())
timer_ref = Erlang.start_timer(timer_resolution, self(), :emit_tick)
enquire_link_limit =
Keyword.get(session_opts, :enquire_link_limit, Defaults.enquire_link_limit())
enquire_link_resp_limit =
Keyword.get(session_opts, :enquire_link_resp_limit, Defaults.enquire_link_resp_limit())
inactivity_limit =
Keyword.get(session_opts, :inactivity_limit, Defaults.inactivity_limit())
session_init_limit =
Keyword.get(session_opts, :session_init_limit, Defaults.session_init_limit())
time = :erlang.monotonic_time(:millisecond)
timers =
SMPPTimers.new(
time,
session_init_limit,
enquire_link_limit,
enquire_link_resp_limit,
inactivity_limit
)
pdu_storage = PduStorage.new()
response_limit = Keyword.get(session_opts, :response_limit, Defaults.response_limit())
auto_pdu_handler = AutoPduHandler.new()
sequence_number = Keyword.get(session_opts, :sequence_number, 0)
{:ok,
%Session{
module: module,
module_state: state,
timers: timers,
pdus: pdu_storage,
auto_pdu_handler: auto_pdu_handler,
response_limit: response_limit,
sequence_number: sequence_number,
time: time,
timer_resolution: timer_resolution,
tick_timer_ref: timer_ref
}}
{:stop, _} = stop ->
stop
end
end
@impl TransportSession
def handle_pdu({:unparsed_pdu, raw_pdu, error}, st) do
{st.module.handle_unparsed_pdu(raw_pdu, error, st.module_state), st}
|> process_handle_unparsed_pdu_reply()
end
def handle_pdu({:pdu, pdu}, st) do
new_st = update_timers_with_incoming_pdu(pdu, st)
case AutoPduHandler.handle_pdu(new_st.auto_pdu_handler, pdu, new_st.sequence_number) do
:proceed ->
handle_pdu_by_callback_module(pdu, new_st)
{:skip, pdus, new_sequence_number} ->
{:ok, pdus, %Session{new_st | sequence_number: new_sequence_number}}
end
end
@impl TransportSession
def handle_send_pdu_result(pdu, send_pdu_result, st) do
new_st = update_timers_with_outgoing_pdu(pdu, send_pdu_result, st)
with {:error, _} <- send_pdu_result do
_ = PduStorage.fetch(new_st.pdus, Pdu.sequence_number(pdu))
end
case AutoPduHandler.handle_send_pdu_result(new_st.auto_pdu_handler, pdu) do
:proceed ->
new_module_state =
st.module.handle_send_pdu_result(pdu, send_pdu_result, new_st.module_state)
%Session{new_st | module_state: new_module_state}
:skip ->
new_st
end
end
@impl TransportSession
def handle_call({:send_pdu, pdu}, _from, st) do
{{:reply, :ok, [pdu], st.module_state}, st}
|> process_handle_call_reply()
end
def handle_call({:stop, reason}, _from, st) do
{{:stop, reason, :ok, st.module_state}, st}
|> process_handle_call_reply()
end
def handle_call({:call, request}, from, st) do
{st.module.handle_call(request, from, st.module_state), st}
|> process_handle_call_reply()
end
def handle_call(request, from, st) do
handle_call({:call, request}, from, st)
end
@impl TransportSession
def handle_cast({:cast, request}, st) do
{st.module.handle_cast(request, st.module_state), st}
|> process_handle_cast_reply()
end
def handle_cast(request, st) do
handle_cast({:cast, request}, st)
end
@impl TransportSession
def handle_info({:timeout, _timer_ref, :emit_tick}, st) do
new_tick_timer_ref = Erlang.start_timer(st.timer_resolution, self(), :emit_tick)
Erlang.cancel_timer(st.tick_timer_ref)
Kernel.send(self(), {:tick, :erlang.monotonic_time(:millisecond)})
{:noreply, [], %Session{st | tick_timer_ref: new_tick_timer_ref}}
end
def handle_info({:tick, time}, st) do
Kernel.send(self(), {:check_timers, time})
Kernel.send(self(), {:check_expired_pdus, time})
{:noreply, [], %Session{st | time: time}}
end
def handle_info({:check_timers, time}, st) do
check_timers(time, st)
end
def handle_info({:check_expired_pdus, time}, st) do
check_expired_pdus(time, st)
end
def handle_info(request, st) do
{st.module.handle_info(request, st.module_state), st}
|> process_handle_info_reply()
end
@impl TransportSession
def handle_socket_closed(st) do
{reason, new_module_state} = st.module.handle_socket_closed(st.module_state)
{reason, %Session{st | module_state: new_module_state}}
end
@impl TransportSession
def handle_socket_error(error, st) do
{reason, new_module_state} = st.module.handle_socket_error(error, st.module_state)
{reason, %Session{st | module_state: new_module_state}}
end
@impl TransportSession
def terminate(reason, st) do
lost_pdus = PduStorage.fetch_all(st.pdus)
case st.module.terminate(reason, lost_pdus, st.module_state) do
:stop ->
{[], st}
{:stop, pdus, new_module_state} when is_list(pdus) ->
{pdus, %Session{st | module_state: new_module_state}}
other ->
exit({:bad_terminate_reply, other})
end
end
@impl TransportSession
def code_change(old_vsn, st, extra) do
case st.module.code_change(old_vsn, st.module_state, extra) do
{:ok, new_module_state} ->
{:ok, %Session{st | module_state: new_module_state}}
{:error, _} = err ->
err
end
end
# Private
defp handle_pdu_by_callback_module(pdu, st) do
if Pdu.resp?(pdu) do
pdu
|> handle_resp_pdu(st)
|> process_handle_resp_reply()
else
pdu
|> handle_non_resp_pdu(st)
|> process_handle_pdu_reply()
end
end
defp handle_non_resp_pdu(pdu, st) do
{st.module.handle_pdu(pdu, st.module_state), st}
end
defp handle_resp_pdu(pdu, st) do
sequence_number = Pdu.sequence_number(pdu)
case PduStorage.fetch(st.pdus, sequence_number) do
[] ->
Logger.info(
"Session #{inspect(self())}, resp for unknown pdu(sequence_number: #{sequence_number}), dropping"
)
{{:ok, st.module_state}, st}
[original_pdu] ->
{st.module.handle_resp(pdu, original_pdu, st.module_state), st}
end
end
defp update_timers_with_incoming_pdu(pdu, st) do
new_timers =
cond do
Pdu.bind_resp?(pdu) && Pdu.success_resp?(pdu) ->
st.timers
|> SMPPTimers.handle_bind(st.time)
|> SMPPTimers.handle_peer_transaction(st.time)
Pdu.resp?(pdu) ->
st.timers
|> SMPPTimers.handle_peer_action(st.time)
true ->
st.timers
|> SMPPTimers.handle_peer_transaction(st.time)
end
%Session{st | timers: new_timers}
end
defp update_timers_with_outgoing_pdu(pdu, send_pdu_result, st) do
new_timers =
if send_pdu_result == :ok and Pdu.bind_resp?(pdu) and Pdu.success_resp?(pdu) do
st.timers
|> SMPPTimers.handle_bind(st.time)
else
st.timers
end
%Session{st | timers: new_timers}
end
defp check_expired_pdus(time, st) do
AutoPduHandler.drop_expired(st.auto_pdu_handler, time)
case PduStorage.fetch_expired(st.pdus, time) do
[] ->
{:noreply, [], st}
pdus ->
module_reply = st.module.handle_resp_timeout(pdus, st.module_state)
process_handle_resp_timeout_reply({module_reply, st})
end
end
defp check_timers(time, st) do
case SMPPTimers.handle_tick(st.timers, time) do
{:ok, new_timers} ->
new_st = %Session{st | timers: new_timers}
{:noreply, [], new_st}
{:stop, reason} ->
exit_reason = st.module.handle_timeout(reason, st.module_state)
{:stop, exit_reason, [], st}
{:enquire_link, new_timers} ->
{enquire_link, new_sequence_number} =
AutoPduHandler.enquire_link(
st.auto_pdu_handler,
time + st.response_limit,
st.sequence_number
)
{:noreply, [enquire_link],
%Session{
st
| sequence_number: new_sequence_number,
timers: new_timers
}}
end
end
defp save_sent_pdus(pdus, st, pdus_to_send \\ [])
defp save_sent_pdus([], st, pdus_to_send), do: {st, Enum.reverse(pdus_to_send)}
defp save_sent_pdus([pdu | pdus], st, pdus_to_send) do
if Pdu.resp?(pdu) do
save_sent_pdus(pdus, st, [pdu | pdus_to_send])
else
sequence_number = st.sequence_number + 1
new_pdu = %Pdu{pdu | sequence_number: sequence_number}
true = PduStorage.store(st.pdus, new_pdu, st.time + st.response_limit)
new_st = %Session{st | sequence_number: sequence_number}
save_sent_pdus(pdus, new_st, [new_pdu | pdus_to_send])
end
end
defp process_handle_pdu_reply({{:ok, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_pdu_reply({{:ok, _pdus, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_pdu_reply({{:stop, _reason, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_pdu_reply({reply, st}), do: {:stop, {:bad_handle_pdu_reply, reply}, [], st}
defp process_handle_unparsed_pdu_reply({{:ok, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_unparsed_pdu_reply({{:ok, _pdus, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_unparsed_pdu_reply({{:stop, _reason, _mst}, _st} = arg),
do: process_reply(arg)
defp process_handle_unparsed_pdu_reply({reply, st}),
do: {:stop, {:bad_handle_unparsed_pdu_reply, reply}, [], st}
defp process_handle_resp_reply({{:ok, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_resp_reply({{:ok, _pdus, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_resp_reply({{:stop, _reason, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_resp_reply({reply, st}),
do: {:stop, {:bad_handle_resp_reply, reply}, [], st}
defp process_handle_resp_timeout_reply({{:ok, mst}, st}),
do: process_reply({{:noreply, mst}, st})
defp process_handle_resp_timeout_reply({{:ok, pdus, mst}, st}),
do: process_reply({{:noreply, pdus, mst}, st})
defp process_handle_resp_timeout_reply({{:stop, _reason, _mst}, _st} = arg),
do: process_reply(arg)
defp process_handle_resp_timeout_reply({reply, st}),
do: {:stop, {:bad_handle_resp_timeout_reply, reply}, [], st}
defp process_handle_call_reply({{:reply, _reply, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_call_reply({{:reply, _reply, _pdus, _mst}, _st} = arg),
do: process_reply(arg)
defp process_handle_call_reply({{:noreply, _pdus, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_call_reply({{:noreply, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_call_reply({{:stop, _rsn, _reply, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_call_reply({{:stop, _rsn, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_call_reply({reply, st}),
do: {:stop, {:bad_handle_call_reply, reply}, [], st}
defp process_handle_cast_reply({{:noreply, _pdus, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_cast_reply({{:noreply, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_cast_reply({{:stop, _rsn, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_cast_reply({reply, st}),
do: {:stop, {:bad_handle_cast_reply, reply}, [], st}
defp process_handle_info_reply({{:noreply, _pdus, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_info_reply({{:noreply, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_info_reply({{:stop, _rsn, _mst}, _st} = arg), do: process_reply(arg)
defp process_handle_info_reply({reply, st}),
do: {:stop, {:bad_handle_info_reply, reply}, [], st}
defp process_reply({{:ok, module_state}, st}) do
{:ok, [], %Session{st | module_state: module_state}}
end
defp process_reply({{:ok, pdus, module_state}, st}) do
{new_st, pdus_to_send} = save_sent_pdus(pdus, st)
{:ok, pdus_to_send, %Session{new_st | module_state: module_state}}
end
defp process_reply({{:reply, reply, module_state}, st}) do
{:reply, reply, [], %Session{st | module_state: module_state}}
end
defp process_reply({{:reply, reply, pdus, module_state}, st}) do
{new_st, pdus_to_send} = save_sent_pdus(pdus, st)
{:reply, reply, pdus_to_send, %Session{new_st | module_state: module_state}}
end
defp process_reply({{:noreply, module_state}, st}) do
{:noreply, [], %Session{st | module_state: module_state}}
end
defp process_reply({{:noreply, pdus, module_state}, st}) do
{new_st, pdus_to_send} = save_sent_pdus(pdus, st)
{:noreply, pdus_to_send, %Session{new_st | module_state: module_state}}
end
defp process_reply({{:stop, reason, reply, module_state}, st}) do
{:stop, reason, reply, [], %Session{st | module_state: module_state}}
end
defp process_reply({{:stop, reason, module_state}, st}) do
{:stop, reason, [], %Session{st | module_state: module_state}}
end
end