lib/load/worker.ex

defmodule Load.Worker do

  use GenServer, restart: :transient

  require Logger

  @connect_delay 200
  @req_timeout :timer.seconds(5)

  def start_link(glob, args \\ []), do: GenServer.start_link(__MODULE__, glob ++ args |> Enum.into(%{}) )

  def init(args) do

    Logger.debug("init called with args: #{inspect(args)}")

    state = args
    |> Map.merge(args.sim.init())
    |> Map.put(:interval_ms, apply(:timer,
      Application.get_env(:load, :worker_timeunit, :seconds), [
      Application.get_env(:load, :worker_interval, 5)
    ]))
    |> Map.put(:stats_interval_ms, apply(:timer,
      Application.get_env(:load, :worker_stats_timeunit, :seconds), [
      Application.get_env(:load, :worker_stats_interval, 1)
    ]))
    |> Map.merge(Stats.empty())

    Process.send_after(self(), :connect, @connect_delay)

    {:ok, state}
  end

  def handle_info(:connect, %{host: host, port: port, opts: _opts} = state) do

    Logger.debug("connect state: #{inspect(state)}")

    case :gun.open(host, port) do
      {:ok, conn} ->
        case :gun.await_up(conn) do
          {:ok, _transport} ->
            Process.send_after(self(), :run, 0)
            {:noreply, Map.put(state, :conn, conn)}
          err ->
            Logger.warn("gun.await_up: #{inspect(err)}")
            {:stop, :normal, state}
        end
      err ->
        Logger.warn("gun.open: #{inspect(err)}")
        {:stop, :normal, state}
    end

  end


  def handle_info(:run, %{sim: sim, interval_ms: interval_ms} = state) do
    state = state
    |> sim.run()
    |> Stats.maybe_update()
    Process.send_after(self(), :run, interval_ms)
    {:noreply, state}
  end

  def hit(target, headers, payload, state) do

    %{host: host, port: port, conn: conn, opts: opts} = state

    case opts do
      %{protocols: [:http], transport: :tcp} ->
        [verb, path] = String.split(target, " ")
        case verb do
          "POST" ->
            Logger.debug("hitting http://#{host}:#{port}#{path}")
            post_ref = :gun.post(conn, "http://#{host}:#{port}#{path}", headers, payload)
            state = Map.update!(state, :requests, &(&1+1))
            handle_http_result(post_ref, state)
          _ ->
            state = Map.update!(state, :failed, &(&1+1))
            {:error , "http tcp #{verb} not_implemented", state}
        end

      %{protocols: [:ilp_packet], transport: :tcp} ->
        {:ok,conn} = :gen_tcp.connect(host, port, [:binary])
        :gen_tcp.send(conn, payload)
        state = Map.update!(state, :requests, &(&1+1))
        {:ok, "no response", state}

      err ->
        state = Map.update!(state, :failed, &(&1+1))
        {:error , "not_implemented #{inspect(err)}", state}

    end
  end

  defp handle_http_result(post_ref, state = %{conn: conn}) do
    case :gun.await(conn, post_ref, @req_timeout) do
      {:response, _, code, _resp_headers} ->
        cond do
          div(code, 100) == 2 ->
            case :gun.await_body(conn, post_ref, @req_timeout) do
              {:ok, resp_payload} ->
                state = Map.update!(state, :succeeded, &(&1+1))
                {:ok, resp_payload, state}
              err ->
                state = Map.update!(state, :failed, &(&1+1))
                {:error, err, state}
            end

          :else ->
            state = Map.update!(state, :failed, &(&1+1))
            {:error, "response code #{code}", state}
        end
      err->
        state = Map.update!(state, :failed, &(&1+1))
        {:error, err, state}
    end

  end

end

    # this is for websocket?
    # case :gun.await_up(conn, @gun_timeout) do
    #     {:ok, _} ->
    #         conn
    #     {:error, :timeout} ->
    #         :timer.sleep(:timer.seconds(2))
    #         create_connection(host_ip, port, http_opts, max_retries - 1)
    #     error ->
    #       Logger.error("Could not connect to host:#{inspect(host_ip)} port:#{inspect(port)} due to:#{inspect(error)}")
    #         error
    # end


  # def handle_info(:get_ip, %{host: host} = state) do

  #   case :inet.getaddr(host, :inet) do

  #     {:ok, ip} ->
  #       Process.send_after(self(), :connect, 0)
  #       {:noreply, Map.put(state, :ip, ip)}

  #     {:error, reason} ->
  #       Logger.error("[#{__MODULE__}] init failed for host:#{inspect(host)} due to:#{inspect(reason)}")
  #       Process.send_after(self(), :get_ip, @connect_delay)
  #   end

  # end