lib/edeliver/relup/instructions/finish_running_requests.ex

defmodule Edeliver.Relup.Instructions.FinishRunningRequests do
@moduledoc """
    Notify request processes that a release upgrade starts.

    This upgrade instruction waits a short time until current
    requests finished and notifies the remaining, that a
    code upgrade will appear.  If a `phoenix` version is used
    which supports the upgrade notification feature, the
    remaining requests that did not finish but failed during
    the upgrade will be replayed with the original request
    when the code upgrade is done. This instruction should be
    used in conjunction with and after the

    `Edeliver.Relup.Instructions.SuspendRanchAcceptors`

    instruction which avoids that new requets are accepted
    during the upgrade.

    To make sure that the http request connections can
    be found on the node, use this instruction after the

    `Edeliver.Relup.Instructions.CheckRanchConnections`

    instruction which will abort the upgrade if the http
    request connections accepted by ranch cannot be found
    in the supervision tree.
  """
  use Edeliver.Relup.RunnableInstruction
  alias Edeliver.Relup.Instructions.CheckRanchAcceptors
  alias Edeliver.Relup.Instructions.CheckRanchConnections

  @doc """
    Appends this instruction to the instructions after the
    "point of no return"

    but before any instruction which
    loads or unloads new code, (re-)starts or stops
    any running processes, or (re-)starts or stops any
    application or the emulator.
  """
  def insert_where, do: &append_after_point_of_no_return/2

  @doc """
    Returns name of the application and the timeout in ms to wait
    until running requests finish.

    These values taken as argument for the `run/1` function
  """
  def arguments(_instructions = %Instructions{}, _config = %{name: name}) when is_atom(name) do
    {name, 500}
  end
  def arguments(_instructions = %Instructions{}, _config = %{name: name}) when is_binary(name) do
    {name |> String.to_atom, 500}
  end

  @doc """
    This module depends on the `Edeliver.Relup.Instructions.CheckRanchAcceptors` and
    the `Edeliver.Relup.Instructions.CheckRanchConnections` module

    which must be loaded before this instruction for upgrades and unload after this instruction
    for downgrades.
  """
  @spec dependencies() :: [Edeliver.Relup.Instructions.CheckRanchAcceptors]
  def dependencies do
    [Edeliver.Relup.Instructions.CheckRanchAcceptors, Edeliver.Relup.Instructions.CheckRanchConnections]
  end

  @doc """
    Waits until the list of processes terminated.

    Waits up to `timeout` ms and the returns the process ids of the processes which are still running
  """
  @spec bulk_wait_for_termination(processes::[pid], timeout::non_neg_integer) :: [pid::pid]
  def bulk_wait_for_termination(_processes = [], _timeout), do: []
  def bulk_wait_for_termination(processes, timeout) do
    proc = self()
    waiting_pid = Process.spawn(fn() ->
      pids_and_monitor_refs = for pid <- processes do
        {pid, :erlang.monitor(:process, pid)}
      end
      wait_fun = fn(pids_and_monitor_refs, wait_fun) ->
        receive do
          {:DOWN, monitor_ref, :process, pid, _reason} ->
            wait_fun.(pids_and_monitor_refs -- [{pid, monitor_ref}], wait_fun)
          :timeout -> send proc, {:remaining, pids_and_monitor_refs |> Enum.map(&(:erlang.element(1, &1)))}
        end
      end
      wait_fun.(pids_and_monitor_refs, wait_fun)
    end, [:link])
    receive do
      :all_terminated -> []
      after timeout ->
        send waiting_pid, :timeout
        receive do
          {:remaining, remaining_pids} -> remaining_pids
        end
    end
  end

  @doc """
    Sends the given event to all processes representing http requests
  """
  @spec notify_running_requests([pid], event::term) :: :ok
  def notify_running_requests([], _event), do: :ok
  def notify_running_requests([pid|remaining_requests], event) do
    send pid, event
    notify_running_requests(remaining_requests, event)
  end


  @doc """
    Waits `timeout` milliseconds until current http requests finished

    and notifies remaining request processes that a code upgrad is running
    and new code will be loaded. This enables phoenix to rerun requests
    which failed during code loading.
  """
  @spec run({otp_application_name::atom, timeout::non_neg_integer}) :: :ok
  def run({otp_application_name, timeout}) do
    info "Waiting up to #{inspect timeout} ms until running requests finished..."
    ranch_listener_sup = CheckRanchAcceptors.ranch_listener_sup(otp_application_name)
    assume true = is_pid(ranch_listener_sup), "Failed to wait until requests finished. Ranch listener supervisor not found."
    ranch_connections_sup = CheckRanchConnections.ranch_connections_sup(ranch_listener_sup)
    assume true = is_pid(ranch_connections_sup), "Failed to wait until requests finished. Ranch connections supervisor not found."
    assume true = is_list(connections = CheckRanchConnections.ranch_connections(ranch_connections_sup)), "FFailed to wait until requests finished. No connection processes found."
    request_connections = case CheckRanchConnections.websocket_channel_connections(otp_application_name, connections) do
      [] -> connections
      websocket_connections = [_|_] -> connections -- websocket_connections
      :not_detected -> connections
    end
    requests_count = Enum.count(request_connections)
    if requests_count == 0 do
      info "No requests running."
    else
      info "Waiting for #{inspect requests_count} requests..."
      remaining_requests = bulk_wait_for_termination(request_connections, timeout)
      remaining_requests_count = Enum.count(remaining_requests)
      info "#{inspect requests_count-remaining_requests_count} requets finished."
      if remaining_requests_count > 0 do
        info "#{inspect remaining_requests_count} requests will be restarted after upgrade if they failed."
        notify_running_requests(remaining_requests, :upgrade_started)
      end
    end
  end
end