core/handler/executor_pool_helper.ex

# Copyright(c) 2015-2023 ACCESS CO., LTD. All rights reserved.

use Croma

defmodule AntikytheraCore.Handler.ExecutorPoolHelper do
  alias Croma.Result, as: R
  alias Antikythera.{GearName, Conn, Context}
  alias Antikythera.ExecutorPool.Id, as: EPoolId
  alias Antikythera.ExecutorPool.BadIdReason
  alias AntikytheraCore.MetricsUploader
  alias AntikytheraCore.Handler.{GearError, HelperModules}
  alias AntikytheraCore.Conn, as: CoreConn
  alias AntikytheraCore.ExecutorPool.Id, as: CoreEPoolId
  alias AntikytheraCore.ExecutorPool.WebsocketConnectionsCounter, as: WsCounter
  alias AntikytheraCore.Ets.GearActionRunnerPools
  alias AntikytheraCore.GearLog.Writer

  defun with_executor(
          conn :: v[Conn.t()],
          gear_name :: v[GearName.t()],
          helper_modules :: v[HelperModules.t()],
          f :: (pid, Conn.t() -> Conn.t())
        ) :: Conn.t() do
    case find_executor_pool(conn, gear_name, helper_modules) do
      {:ok, epool_id} -> run_within_executor_pool(conn, helper_modules, epool_id, f)
      {:error, reason} -> GearError.bad_executor_pool_id(conn, reason)
    end
  end

  defunp find_executor_pool(
           conn :: v[Conn.t()],
           gear_name :: v[GearName.t()],
           %HelperModules{top: top}
         ) :: R.t(EPoolId.t(), BadIdReason.t()) do
    top.executor_pool_for_web_request(conn) |> CoreEPoolId.validate_association(gear_name)
  end

  defunp run_within_executor_pool(
           %Conn{context: context} = conn1,
           helper_modules :: v[HelperModules.t()],
           epool_id :: v[EPoolId.t()],
           f :: (pid, Conn.t() -> Conn.t())
         ) :: Conn.t() do
    conn2 = %Conn{conn1 | context: %Context{context | executor_pool_id: epool_id}}

    try do
      PoolSup.Multi.transaction(GearActionRunnerPools.table_name(), epool_id, fn pid ->
        f.(pid, conn2)
      end)
    catch
      :exit, {:timeout, _} ->
        report_failure_in_checkout(helper_modules, epool_id)
        GearError.error(conn2, :timeout_in_epool_checkout, [])
    end
  end

  defp report_failure_in_checkout(%HelperModules{metrics_uploader: metrics_uploader}, epool_id) do
    data = {"web_timeout_in_epool_checkout", :sum, 1}
    MetricsUploader.submit(metrics_uploader, [data], epool_id)
  end

  defun increment_ws_count(
          %Conn{
            context: %Context{
              start_time: start_time,
              context_id: context_id,
              executor_pool_id: epool_id
            }
          } = conn,
          %{pid: ws_pid} = req :: :cowboy_req.req(),
          %HelperModules{logger: logger},
          f :: (() -> a)
        ) :: :cowboy_req.req() | a
        when a: any do
    case WsCounter.increment(epool_id, ws_pid) do
      :ok ->
        f.()

      {:error, :too_many_connections} ->
        Writer.error(
          logger,
          start_time,
          context_id,
          "Cannot establish new websocket connection: too many connections in executor pool #{inspect(epool_id)}"
        )

        GearError.ws_too_many_connections(conn) |> CoreConn.reply_as_cowboy_res(req)
    end
  end
end