lib/chaperon/session.ex

defmodule Chaperon.Session do
  @moduledoc """
  Defines a Session struct and corresponding helper functions that are used
  within `Chaperon.Scenario` definitions.
  Most of Chaperon's logic is centered around these sessions.
  """

  defstruct id: nil,
            name: nil,
            results: %{},
            errors: %{},
            async_tasks: %{},
            config: %{},
            assigned: %{},
            metrics: %{},
            scenario: nil,
            cookies: [],
            parent_id: nil,
            parent_pid: nil,
            cancellation: nil,
            timeout_at: nil,
            interval_task: nil

  @type t :: %Chaperon.Session{
          id: String.t(),
          name: String.t(),
          results: map,
          errors: map,
          async_tasks: map,
          config: map,
          assigned: map,
          metrics: map,
          scenario: Chaperon.Scenario.t(),
          cookies: [String.t()],
          parent_id: String.t() | nil,
          parent_pid: pid | nil,
          cancellation: String.t() | nil,
          timeout_at: DateTime.t() | nil,
          interval_task: Task.t() | nil
        }

  @type metric :: {atom, any} | any

  @type config_key :: [atom] | atom | String.t()

  @type func :: Chaperon.Action.CallFunction.callback()

  require Logger
  alias Chaperon.Session
  alias Chaperon.Session.Error
  alias Chaperon.Action
  alias Chaperon.Action.SpreadAsync
  alias Chaperon.Action.HTTP
  import Chaperon.Timing
  import Chaperon.Util
  use Chaperon.Session.Logging

  @default_timeout seconds(10)

  @type result_callback :: atom | (Session.t(), any -> Session.t())

  defmacro __using__(_opts) do
    quote do
      require Logger
      require Chaperon.Session
      import Chaperon.Session
    end
  end

  def new_id() do
    UUID.uuid4()
  end

  def fork(session) do
    %{session | id: new_id(), parent_id: session.id, parent_pid: self()}
    |> reset_action_metadata()
    |> ws_reset()
  end

  def ws_reset(session) do
    session
    |> Chaperon.Action.WebSocket.reset_websockets()
  end

  @doc """
  Concurrently spreads a given action with a given rate over a given time
  interval within `session`.
  """
  @spec cc_spread(
          Session.t(),
          atom,
          SpreadAsync.rate(),
          SpreadAsync.time(),
          atom | nil
        ) :: Session.t()

  def cc_spread(session, func_name, rate, interval) do
    cc_spread(session, func_name, rate, interval, nil)
  end

  def cc_spread(session, func_name, rate, interval, task_name)
      when is_float(rate) or is_float(interval) do
    new_rate = round(rate)
    new_interval = round(interval)

    if is_float(rate) do
      session
      |> log_warn("cc_spread rate is float: #{rate}, rounding to: #{new_rate}")
    end

    if is_float(interval) do
      session
      |> log_warn("cc_spread interval is float: #{interval}, rounding to: #{new_interval}")
    end

    session
    |> cc_spread(func_name, new_rate, interval, task_name)
  end

  def cc_spread(session, func_name, rate, interval, task_name)
      when is_integer(rate) and is_integer(interval) do
    session
    |> run_action(%SpreadAsync{
      func: func_name,
      rate: rate,
      interval: interval,
      task_name: task_name || func_name
    })
  end

  @type cc_spread_options ::
          [
            rate: SpreadAsync.rate(),
            interval: SpreadAsync.time(),
            name: atom | nil
          ]
          | %{
              rate: SpreadAsync.rate(),
              interval: SpreadAsync.time(),
              name: atom | nil
            }

  @doc """
  Concurrently spreads a given action with a given rate over a given time
  interval within `session`.
  """
  @spec cc_spread(Session.t(), atom, cc_spread_options) :: Session.t()
  def cc_spread(session, func_name, opts \\ []) do
    session
    |> run_action(%SpreadAsync{
      func: func_name,
      rate: opts[:rate],
      interval: opts[:interval],
      task_name: opts[:name] || func_name
    })
  end

  @doc """
  Loops a given action for a given duration, returning the resulting session at
  the end.
  """
  @spec loop(Session.t(), atom, Chaperon.Timing.duration()) :: Session.t()
  def loop(session, action_name, duration) do
    session
    |> run_action(%Action.Loop{
      action: %Action.CallFunction{func: action_name},
      duration: duration
    })
  end

  @doc """
  Repeats calling a given function with session a given amount of times,
  returning the resulting session at the end.

  ## Example

      session
      |> repeat(:foo, 2)

      # same as:
      session
      |> foo
      |> foo
  """
  @spec repeat(Session.t(), func, non_neg_integer) :: Session.t()
  def repeat(session, _, 0), do: session

  def repeat(session, func, amount) when amount > 0 do
    session
    |> call(func)
    |> repeat(func, amount - 1)
  end

  @doc """
  Repeats calling a given function with session and additional args a given
  amount of times, returning the resulting session at the end.

  ## Example

      session
      |> repeat(:foo, ["bar", "baz"], 2)

      # same as:
      session
      |> foo("bar", "baz") |> foo("bar", "baz")
  """
  @spec repeat(Session.t(), func, [any], non_neg_integer) :: Session.t()
  def repeat(session, _, _, 0), do: session

  def repeat(session, func, args, amount) when amount > 0 do
    session
    |> call(func, args)
    |> repeat(func, args, amount - 1)
  end

  @doc """
  Repeats calling a given function with session a given amount of times,
  returning the resulting session at the end. Also traces durations for all
  calls to the given function.

  ## Example

      session
      |> repeat_traced(:foo, 2)

      # same as:
      session
      |> call_traced(:foo)
      |> call_traced(:foo)
  """
  @spec repeat_traced(Session.t(), func, non_neg_integer) :: Session.t()
  def repeat_traced(session, _, 0), do: session

  def repeat_traced(session, func, amount) when amount > 0 do
    session
    |> call_traced(func)
    |> repeat_traced(func, amount - 1)
  end

  @doc """
  Repeats calling a given function with session and additional args a given
  amount of times, returning the resulting session at the end.

  ## Example

      session
      |> repeat_traced(:foo, ["bar", "baz"], 2)

      # same as:
      session
      |> call_traced(:foo, ["bar", "baz"])
      |> call_traced(:foo, ["bar", "baz"])
  """
  @spec repeat_traced(Session.t(), func, [any], non_neg_integer) :: Session.t()
  def repeat_traced(session, _, _, 0), do: session

  def repeat_traced(session, func, args, amount) when amount > 0 do
    session
    |> call_traced(func, args)
    |> repeat_traced(func, args, amount - 1)
  end

  @type retry_options :: [
          retries: non_neg_integer,
          delay: non_neg_integer,
          random_delay: non_neg_integer
        ]

  @default_retry_opts [retries: 1, random_delay: 1_000]

  @doc """
  Call a given function (with arguments). If any exception is raised,
  retry the call a given amount of times (defaults to 1 retry).
  The retry can be delayed by a fixed or random duration (defaults to 1s).

  Example:
      session
      |> retry_on_error(:publish, ["post title"], retries: 10, delay: 0.5 |> seconds)
      # call function without args
      |> retry_on_error(:cleanup, [], retries: 10, delay: 0.5 |> seconds)

      # retry once by default
      session
      |> retry_on_error(:publish, ["post title"], random_delay: 5 |> seconds)

      # retry once with default delay of 1s
      session
      |> retry_on_error(:publish, ["post title"])

      # retry function without args and default options
      session
      |> retry_on_error(:publish_default)
  """
  @spec retry_on_error(Session.t(), func, [any], retry_options) :: Session.t()
  def retry_on_error(session, func, args \\ [], opts \\ @default_retry_opts) do
    retries = opts[:retries] || 1

    try do
      session
      |> call(func, args)
    rescue
      err ->
        session
        |> log_error(inspect(err))

        retries =
          case retries do
            :infinity -> :infinity
            r -> r - 1
          end

        if retries > 0 do
          opts = Keyword.merge(opts, retries: retries)

          session
          |> log_error("Retrying #{func} another #{retries} times")
          |> retry_delay(opts)
          |> retry_on_error(func, args, opts)
        else
          reraise err, __STACKTRACE__
        end
    end
  end

  defp retry_delay(session, opts) do
    case {opts[:random_delay], opts[:delay]} do
      {nil, nil} ->
        session

      {nil, delay} ->
        session
        |> delay(delay)

      {r_delay, nil} ->
        session
        |> delay(:rand.uniform(r_delay))

      {r_delay, _} ->
        session
        |> delay(:rand.uniform(r_delay))
    end
  end

  @doc """
  Returns the session's configured timeout or the default timeout, if none
  specified.

  ## Example

      iex> session = %Chaperon.Session{config: %{timeout: 10}}
      iex> session |> Chaperon.Session.timeout
      10
  """
  @spec timeout(Session.t()) :: non_neg_integer
  def timeout(session) do
    session.config[:timeout] || @default_timeout
  end

  @spec await(Session.t(), atom, Task.t()) :: Session.t()
  def await(session, _task_name, nil), do: session

  def await(session, task_name, task = %Task{}) do
    session = session |> maybe_spawn_interval_task()

    task_session = task |> Task.await(session |> timeout)

    session
    |> maybe_end_interval_task()
    |> remove_async_task(task_name, task)
    |> merge_async_task_result(task_session, task_name)
  end

  @spec await(Session.t(), atom, [Task.t()]) :: Session.t()
  def await(session, task_name, tasks) when is_list(tasks) do
    tasks
    |> Enum.reduce(session, &await(&2, task_name, &1))
  end

  @doc """
  Await an async task with a given `task_name` in `session`.
  """
  @spec await(Session.t(), atom) :: Session.t()
  def await(session, task_name) when is_atom(task_name) do
    session
    |> await(task_name, session.async_tasks[task_name])
  end

  @spec await(Session.t(), [atom]) :: Session.t()
  def await(session, task_names) when is_list(task_names) do
    task_names
    |> Enum.reduce(session, &await(&2, &1))
  end

  @doc """
  Await all async tasks with a given `task_name` in `session`.
  """
  @spec await_all(Session.t(), atom) :: Session.t()
  def await_all(session, task_name) do
    session
    |> await(task_name, session.async_tasks[task_name])
  end

  @doc """
  Returns a single task or a list of tasks associated with a given `task_name`
  in `session`.
  """
  @spec async_task(Session.t(), atom) :: Task.t() | [Task.t()]
  def async_task(session, task_name) do
    session.async_tasks[task_name]
  end

  @doc """
  Performs a HTTP GET request on `session`'s base_url and `path`.
  Takes an optional list of options to be passed to `HTTPotion`.
  """
  @spec get(Session.t(), String.t(), HTTP.options()) :: Session.t()
  def get(session, path, opts \\ []) do
    session
    |> run_action(Action.HTTP.get(path, opts))
  end

  @doc """
  Performs a HTTP POST request on `session`'s base_url and `path`.
  Takes an optional list of options to be passed to `HTTPotion`.
  """
  @spec post(Session.t(), String.t(), HTTP.options()) :: Session.t()
  def post(session, path, opts \\ []) do
    session
    |> run_action(Action.HTTP.post(path, opts))
  end

  @doc """
  Performs a HTTP PUT request on `session`'s base_url and `path`.
  Takes an optional list of options to be passed to `HTTPotion`.
  """
  @spec put(Session.t(), String.t(), HTTP.options()) :: Session.t()
  def put(session, path, opts \\ []) do
    session
    |> run_action(Action.HTTP.put(path, opts))
  end

  @doc """
  Performs a HTTP PATCH request on `session`'s base_url and `path`.
  Takes an optional list of options to be passed to `HTTPotion`.
  """
  @spec patch(Session.t(), String.t(), HTTP.options()) :: Session.t()
  def patch(session, path, opts) do
    session
    |> run_action(Action.HTTP.patch(path, opts))
  end

  @doc """
  Performs a HTTP DELETE request on `session`'s base_url and `path`.
  Takes an optional list of options to be passed to `HTTPotion`.
  """
  @spec delete(Session.t(), String.t(), HTTP.options()) :: Session.t()
  def delete(session, path, opts \\ []) do
    session
    |> run_action(Action.HTTP.delete(path, opts))
  end

  @doc """
  Performs a WebSocket connection attempt on `session`'s base_url and
  `path`.
  """
  @spec ws_connect(Session.t(), String.t(), Keyword.t()) :: Session.t()
  def ws_connect(session, path, options \\ []) do
    session
    |> run_action(Action.WebSocket.connect(path, options))
  end

  @doc """
  Performs a WebSocket message send on `session`s WebSocket connection.
  Takes an optional list of `options` to be passed along to `Socket.Web.send/3`.
  """
  @spec ws_send(Session.t(), any, Keyword.t()) :: Session.t()
  def ws_send(session, msg, options \\ []) do
    session
    |> run_action(Action.WebSocket.send(msg, options))
  end

  @doc """
  Performs a WebSocket message receive on `session`s WebSocket connection.
  Takes an optional list of `options` to be passed along to `Socket.Web.recv/2`.
  """
  @spec ws_recv(Session.t(), Keyword.t()) :: Session.t()
  def ws_recv(session, options \\ []) do
    session
    |> run_action(Action.WebSocket.recv(options))
  end

  @doc """
  Performs a WebSocket message receive on `session`s WebSocket connection.
  Takes an optional list of `options` to be passed along to `Socket.Web.recv/2`.
  """
  @spec ws_await_recv(Session.t(), any, Keyword.t()) :: Session.t()
  def ws_await_recv(session, expected_message, options \\ []) do
    opts =
      options
      |> Keyword.merge(with_result: &ws_await_recv_loop(&1, expected_message, &2, options))

    session
    |> ws_recv(opts)
  end

  defp ws_await_recv_loop(session, expected_msg, msg, options) do
    if is_expected_message(msg, expected_msg) do
      session
      |> log_debug("Awaited expected WS message")
      |> call_callback(options[:with_result], msg)
    else
      session
      |> log_debug("Ignoring unexpected WS message #{inspect(msg)}")
      |> ws_await_recv(expected_msg, options)
    end
  end

  defp is_expected_message(msg, expected_msg) when is_function(expected_msg) do
    expected_msg.(msg)
  end

  defp is_expected_message(msg, expected_msg) do
    case msg do
      ^expected_msg -> true
      _ -> false
    end
  end

  @doc """
  Closes the session's websocket connection.
  Takes an optional list of `options` to be passed along to `Socket.Web.close/2`.
  """
  @spec ws_close(Session.t(), Keyword.t()) :: Session.t()
  def ws_close(session, options \\ []) do
    session
    |> run_action(Action.WebSocket.close(options))
  end

  @doc """
  Calls a function inside the `session`'s scenario module with the given name
  and args, returning the resulting session.
  """
  def call(session, func, args \\ [])

  @spec call(Session.t(), func, [any]) :: Session.t()
  def call(session, func, args)
      when is_atom(func) do
    apply(session.scenario.module, func, [session | args])
  end

  def call(session, {module, func}, args)
      when is_atom(module) and is_atom(func) do
    apply(module, func, [session | args])
  end

  @doc """
  Calls a given function or a function with the given name and args, then
  captures duration metrics in `session`.
  """
  @spec call_traced(Session.t(), func, [any]) :: Session.t()
  def call_traced(session, func, args \\ []) do
    session
    |> run_action(%Action.CallFunction{func: func, args: args})
  end

  @doc """
  Runs & captures metrics of running another `Chaperon.Scenario` from `session`
  using `session`s config.
  """
  @spec run_scenario(Session.t(), Action.RunScenario.scenario()) :: Session.t()
  def run_scenario(session, scenario) do
    session
    |> run_action(Action.RunScenario.new(scenario, session.config, :local))
  end

  @doc """
  Runs & captures metrics of running another `Chaperon.Scenario` from `session`
  with a given `config`.
  """
  @spec run_scenario(
          Session.t(),
          Action.RunScenario.scenario(),
          map,
          boolean
        ) :: Session.t()
  def run_scenario(session, scenario, config, merge_config \\ true) do
    session
    |> run_scenario_with_config(scenario, config, merge_config, :local)
  end

  @doc """
  Runs & captures metrics of running another `Chaperon.Scenario` from `session`
  with a given `config` on a random node in the Chaperon cluster.
  """
  def schedule_scenario(session, scenario) do
    session
    |> run_action(Action.RunScenario.new(scenario, session.config, :cluster))
  end

  @doc """
  Runs & captures metrics of running another `Chaperon.Scenario` from `session`
  with a given `config`.
  """
  @spec schedule_scenario(
          Session.t(),
          Action.RunScenario.scenario(),
          map,
          boolean
        ) :: Session.t()
  def schedule_scenario(session, scenario, config, merge_config \\ true) do
    session
    |> run_scenario_with_config(scenario, config, merge_config, :cluster)
  end

  @spec run_scenario_with_config(
          Session.t(),
          Action.RunScenario.scenario(),
          map,
          boolean,
          Action.RunScenario.scheduler()
        ) :: Session.t()
  defp run_scenario_with_config(session, scenario, config, merge_config, scheduler) do
    scenario_config =
      if merge_config do
        DeepMerge.deep_merge(session.config, config)
      else
        config
      end

    session
    |> run_action(Action.RunScenario.new(scenario, scenario_config, scheduler))
  end

  @doc """
  Runs a given action within `session` and returns the resulting
  session.
  """
  @spec run_action(Session.t(), Chaperon.Actionable.t()) :: Session.t()
  def run_action(session = %{cancellation: reason}, _) when is_binary(reason) do
    session
  end

  def run_action(session, action) do
    case Chaperon.Actionable.run(action, session) do
      {:error, %Chaperon.Session.Error{reason: reason}} ->
        session
        |> log_error("Session.run_action #{action} failed: #{inspect(reason)}")

        put_in(session.errors[action], reason)

      {:error, %Chaperon.Action.Error{reason: reason}} ->
        session
        |> log_error("Session.run_action #{action} failed: #{inspect(reason)}")

        put_in(session.errors[action], reason)

      {:error, reason} ->
        session
        |> log_debug("Session.run_action #{action} failed: #{inspect(reason)}")

        put_in(session.errors[action], reason)

      {:ok, new_session = %Chaperon.Session{}} ->
        session
        |> log_debug("SUCCESS #{action}")

        new_session
    end
  end

  @doc """
  Assigns a given list of key-value pairs (as a `Keyword` list) in `session`
  for further usage later.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{} |> assign(foo: 1, bar: "hello")
      iex> session.assigned.foo
      1
      iex> session.assigned.bar
      "hello"
      iex> session.assigned
      %{foo: 1, bar: "hello"}
  """
  @spec assign(Session.t(), Keyword.t()) :: Session.t()
  def assign(session, assignments) do
    assignments
    |> Enum.reduce(session, fn {k, v}, session ->
      put_in(session.assigned[k], v)
    end)
  end

  @doc """
  Assigns a given list of key-value pairs (as a `Keyword` list) under a given
  namespace in `session` for further usage later.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{} |> assign(:api, auth_token: "auth123", login: "foo@bar.com")
      iex> session.assigned.api
      %{auth_token: "auth123", login: "foo@bar.com"}
      iex> session.assigned.api.auth_token
      "auth123"
      iex> session.assigned.api.login
      "foo@bar.com"
      iex> session.assigned
      %{api: %{auth_token: "auth123", login: "foo@bar.com"}}
  """
  @spec assign(Session.t(), atom, Keyword.t()) :: Session.t()
  def assign(session, namespace, assignments) do
    assignments = assignments |> Enum.into(%{})

    session
    |> update_assign([{namespace, &Map.merge(&1 || %{}, assignments)}])
  end

  @doc """
  Updates assignments based on a given Keyword list of functions to be used for
  updating `assigned` in `session`.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{} |> assign(foo: 1, bar: "hello")
      iex> session.assigned
      %{foo: 1, bar: "hello"}
      iex> session = session |> update_assign(foo: &(&1 + 2))
      iex> session.assigned.foo
      3
      iex> session.assigned.bar
      "hello"
      iex> session.assigned
      %{foo: 3, bar: "hello"}
  """
  @spec update_assign(Session.t(), Keyword.t((any -> any))) :: Session.t()
  def update_assign(session, assignments) do
    assignments
    |> Enum.reduce(session, fn {k, func}, session ->
      update_in(session.assigned[k], func)
    end)
  end

  @doc """
  Updates assignments based on a given Keyword list of functions to be used for
  updating `assigned` within `namespace` in `session`.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{} |> assign(:api, auth_token: "auth123", login: "foo@bar.com")
      iex> session.assigned.api
      %{auth_token: "auth123", login: "foo@bar.com"}
      iex> session = session |> update_assign(:api, login: &("test" <> &1))
      iex> session.assigned.api.login
      "testfoo@bar.com"
      iex> session.assigned.api
      %{auth_token: "auth123", login: "testfoo@bar.com"}
  """
  @spec update_assign(Session.t(), atom, Keyword.t((any -> any))) :: Session.t()
  def update_assign(session, namespace, assignments) do
    assignments
    |> Enum.reduce(session, fn {k, func}, session ->
      update_in(session.assigned[namespace][k], func)
    end)
  end

  def delete_assign(session, key) do
    update_in(session.assigned, &Map.delete(&1, key))
  end

  def delete_assign(session, namespace, key) do
    update_in(session.assigned[namespace], &Map.delete(&1, key))
  end

  @doc """
  Updates a session's config based on a given Keyword list of functions to be
  used for updating `config` in `session`.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{config: %{foo: 1, bar: "hello"}}
      iex> session.config
      %{foo: 1, bar: "hello"}
      iex> session = session |> update_config(foo: &(&1 + 2))
      iex> session.config.foo
      3
      iex> session.config.bar
      "hello"
      iex> session.config
      %{foo: 3, bar: "hello"}
  """
  @spec update_config(Session.t(), Keyword.t((any -> any))) :: Session.t()
  def update_config(session, assignments) do
    assignments
    |> Enum.reduce(session, fn {k, func}, session ->
      update_in(session.config[k], func)
    end)
  end

  @doc """
  Updates a session's config based on a given Keyword list of functions to be
  used for updating `config` in `session` under a given namespace.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{config: %{foo: 1, bar: %{baz: "hello", quux: 0}}}
      iex> session.config
      %{foo: 1, bar: %{baz: "hello", quux: 0}}
      iex> session = session |> update_config(:bar, quux: &(&1 + 2))
      iex> session.config
      %{foo: 1, bar: %{baz: "hello", quux: 2}}
  """
  @spec update_config(Session.t(), atom, Keyword.t((any -> any))) :: Session.t()
  def update_config(session, namespace, assignments) do
    assignments
    |> Enum.reduce(session, fn {k, func}, session ->
      update_in(session.config[namespace][k], func)
    end)
  end

  @doc """
  Updates a session's config based on a given Keyword list of new values to be
  used for `config` in `session`.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{config: %{foo: 1, bar: "hello"}}
      iex> session.config
      %{foo: 1, bar: "hello"}
      iex> session = session |> set_config(foo: 10, baz: "wat")
      iex> session.config.foo
      10
      iex> session.config.bar
      "hello"
      iex> session.config.baz
      "wat"
      iex> session.config
      %{foo: 10, bar: "hello", baz: "wat"}
  """
  @spec set_config(Session.t(), Keyword.t(any)) :: Session.t()
  def set_config(session, assignments) do
    assignments
    |> Enum.reduce(session, fn {k, val}, session ->
      put_in(session.config[k], val)
    end)
  end

  @doc """
  Updates a session's config based on a given Keyword list of new values inside
  a given namespace to be used for `config` in `session`.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{config: %{foo: 1, bar: %{baz: "hello",  quux: 0}}}
      iex> session.config
      %{foo: 1, bar: %{baz: "hello", quux: 0}}
      iex> session = session |> set_config(:bar, quux: 10)
      iex> session.config.bar.quux
      10
      iex> session.config.bar
      %{baz: "hello", quux: 10}
      iex> session.config
      %{foo: 1, bar: %{baz: "hello", quux: 10}}
  """
  @spec set_config(Session.t(), atom, Keyword.t(any)) :: Session.t()
  def set_config(session, namespace, assignments) do
    assignments
    |> Enum.reduce(session, fn {k, val}, session ->
      put_in(session.config[namespace][k], val)
    end)
  end

  @doc """
  Get a (possibly nested) config value.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{config: %{foo: 1, bar: %{val1: "V1", val2: "V2"}}}
      iex> session.config
      %{foo: 1, bar: %{val1: "V1", val2: "V2"}}
      iex> session |> config(:foo)
      1
      iex> try do
      iex>   session |> config(:invalid) # does not exist
      iex> rescue
      iex>   _ in Chaperon.Session.RequiredConfigMissingError -> :failed
      iex> end
      :failed
      iex> session |> config([:bar, :val1])
      "V1"
      iex> session |> config([:bar, :val2])
      "V2"
      iex> session |> config("bar.val1")
      "V1"
      iex> session |> config("bar.val2")
      "V2"
  """
  @spec config(Session.t(), config_key) :: any
  def config(session, key) do
    case config(session, key, :___no_default_config_given___) do
      :___no_default_config_given___ ->
        session
        |> required_config(session.config, key)

      val ->
        val
    end
  end

  @doc """
  Get a (possibly nested) config value and return the given default value,
  if config value does not exist.

  ## Example

      iex> alias Chaperon.Session; import Session
      iex> session = %Session{config: %{foo: 1, bar: %{val1: "V1", val2: "V2"}}}
      iex> session.config
      %{foo: 1, bar: %{val1: "V1", val2: "V2"}}
      iex> session |> config(:foo)
      1
      iex> try do
      iex>   session |> config(:invalid) # no default value given
      iex> rescue
      iex>   _ in Chaperon.Session.RequiredConfigMissingError -> :failed
      iex> end
      :failed
      iex> session |> config(:invalid, "default")
      "default"
      iex> session |> config([:bar, :val1])
      "V1"
      iex> session |> config([:bar, :val2])
      "V2"
      iex> session |> config("bar.val1")
      "V1"
      iex> session |> config("bar.val2")
      "V2"
  """
  @spec config(Session.t(), config_key, any) :: any
  def config(session, key, default_val) do
    case key do
      keys when is_list(keys) ->
        session
        |> find_nested_config_val(keys, default_val)

      path when is_binary(path) ->
        keys =
          path
          |> String.split(".")
          |> Enum.map(&String.to_atom/1)

        session
        |> config(keys, default_val)

      _ ->
        Map.get(session.config, key, default_val)
    end
  end

  defp find_nested_config_val(session, _keys = [key1 | rest], default_val) do
    if Map.has_key?(session.config, key1) do
      rest
      |> Enum.reduce(session.config[key1], fn
        key, acc when is_map(acc) ->
          acc
          |> Map.get(key, default_val)

        _key, acc ->
          acc
      end)
    else
      default_val
    end
  end

  defmodule RequiredConfigMissingError do
    defexception config_key: nil, session: nil

    @type t :: %__MODULE__{
            config_key: Chaperon.Session.config_key(),
            session: Chaperon.Session.t()
          }

    @spec new(Chaperon.Session.config_key(), Chaperon.Session.t()) :: t()
    def new(key, session) do
      %__MODULE__{config_key: key, session: session}
    end

    @spec message(t()) :: String.t()
    def message(%__MODULE__{config_key: key, session: session}) do
      "[Chaperon.Session.RequiredConfigMissingError #{session.id} #{session.name}] | #{
        inspect(key)
      } "
    end
  end

  defp required_config(session, key) do
    session
    |> required_config(session.config, key)
  end

  defp required_config(session, map, key) do
    case Map.fetch(map, key) do
      {:ok, val} ->
        val

      :error ->
        session
        |> log_error("Config key #{inspect(key)} not found")

        raise RequiredConfigMissingError.new(key, session)
    end
  end

  @spec skip_query_params_in_metrics(Session.t()) :: Session.t()
  def skip_query_params_in_metrics(session) do
    session
    |> set_config(skip_query_params_in_metrics: true)
  end

  @doc """
  Runs a given function with args asynchronously from `session`.
  """
  @spec async(Session.t(), atom | {atom, atom}, [any], atom | nil) :: Session.t()
  def async(session, target, args \\ [], task_name \\ nil) do
    {mod, func} =
      case target do
        {module, f} ->
          {module, f}

        f when is_atom(f) ->
          {session.scenario.module, f}
      end

    session
    |> run_action(%Action.Async{
      module: mod,
      function: func,
      args: args,
      task_name: task_name || func
    })
  end

  @doc """
  Delays further execution of `session` by a given `duration`.
  `duration` can also be `{:random, integer_val}` in which case `random_delay`
  is called with `integer_val`.

  Example:
      session
      |> delay(3 |> seconds)
      |> get("/")

      # or with random delay up to 3 seconds:
      session
      |> delay({:random, 3 |> seconds})
      |> get("/")
  """
  @spec delay(Session.t(), Chaperon.Timing.duration()) :: Session.t()
  def delay(session, {:random, max_duration}) do
    session
    |> random_delay(max_duration)
  end

  def delay(session, duration) do
    :timer.sleep(duration)
    session
  end

  @doc """
  Delays further execution of `session` by a random value up to the given
  `duration`.
  """
  @spec random_delay(Session.t(), Chaperon.Timing.duration()) :: Session.t()
  def random_delay(session, max_duration) do
    session
    |> delay(:rand.uniform(max_duration))
  end

  @doc """
  Adds a given `Task` to `session` under a given `name`.
  """
  @spec add_async_task(Session.t(), atom, Task.t()) :: Session.t()
  def add_async_task(session, name, task) do
    update_in(session.async_tasks[name], &[task | List.wrap(&1)])
  end

  @doc """
  Send a signal to the current session's async task with the given name.

  Example:

      # scenario run function
      def run(session) do
        session
        |> async(:search_entries, ["chaperon", "load testing"])
        |> async(:do_other_stuff)
        |> signal(:search_entries, :continue_search)
      end

      def search_entries(session, tag1, tag2) do
        session
        |> get("/search", json: [tag: tag1])
        |> await_signal(:continue_search)
        |> get("/search", json: [tag: tag2])
      end
  """
  @spec signal(Session.t(), atom, any) :: Session.t()
  def signal(session, name, signal) do
    case session.async_tasks[name] do
      ts when is_list(ts) ->
        ts
        |> Enum.each(&send(&1.pid, {:chaperon_signal, signal}))

      t ->
        send(t.pid, {:chaperon_signal, signal})
    end

    session
  end

  @doc """
  Sends a signal to the current session's parent session (that spawned it via
  a call to `Session.async`).

  Example:

      # scenario run function
      def run(session) do
        stream_path = "/secret/live/stream.json"
        session
        |> async(:connect_to_stream, [stream_path])
        |> await_signal({:connected_to_stream, stream_path})
        # ...
      end

      def connect_to_stream(session, stream_path) do
        session
        |> ws_connect(stream_path)
        |> signal_parent({:connected_to_stream, stream_path})
        |> stream_data
      end

      # ...
  """
  @spec signal_parent(Session.t(), any) :: Session.t()
  def signal_parent(session, signal) do
    send(session.parent_pid, {:chaperon_signal, signal})
    session
  end

  @doc """
  Await any incoming signal for current session within given timeout.
  If callback is provided, it will be called with the session and the received
  signal value.

  Example:

      session
      |> await_signal_or_timeout(5 |> seconds, fn(session, signal) ->
        session
        |> log_info("Got signal")
        |> assign(signal: signal)
      end)

      # or using an atom as the callback:

      def run(session) do
        session
        |> await_signal_or_timeout(5 |> seconds, :got_signal)
      end

      def got_signal(session, signal) do
        session
        |> log_info("Got signal")
        |> assign(signal: signal)
      end
  """
  @spec await_signal_or_timeout(
          Session.t(),
          non_neg_integer,
          nil | (Session.t(), any -> Session.t())
        ) :: Session.t()
  def await_signal_or_timeout(session, timeout, callback \\ nil) do
    await_common(
      session,
      :__no_expected_signal__,
      fn session, signal -> session |> call_callback(callback, signal) end,
      timeout
    )
  end

  @doc """
  Await any signal and call a given callback with the session and the received
  signal.

  Example:

      session
      |> await_signal(fn(session, signal) ->
        session
        |> assign(signal: signal)
      end)
  """
  @spec await_signal(
          Session.t(),
          any | (Session.t(), any -> Session.t())
        ) :: Session.t()
  def await_signal(session, callback) when is_function(callback) do
    await_common(
      session,
      :__no_expected_signal__,
      callback,
      session |> timeout
    )
  end

  def await_signal(session, expected_signal) do
    await_common(
      session,
      expected_signal,
      fn session, _ -> session end,
      session |> timeout
    )
  end

  @doc """
  Await an expected signal with a given timeout.

  Example:

      session
      |> await_signal(:continue_search, 5 |> seconds)
      |> get("/search", params: [query: "Got load test?"])
  """
  @spec await_signal(Session.t(), any, non_neg_integer) :: Session.t()
  def await_signal(session, expected_signal, timeout) do
    await_common(
      session,
      expected_signal,
      fn session, _ -> session end,
      timeout
    )
  end

  defp await_common(session, expected_signal, callback, timeout) do
    session = %{session | timeout_at: get_timeout_at(timeout)}

    msg_ref = make_ref()

    tref =
      case session.config[:interval] do
        {interval, fun} ->
          :timer.send_interval(interval, {:chaperon_interval, msg_ref, fun})

        nil ->
          nil
      end

    session = do_await_common(session, expected_signal, callback, timeout, msg_ref)

    if tref do
      :timer.cancel(tref)
    end

    session
  end

  defp get_timeout_at(:infinity), do: :infinity
  defp get_timeout_at(timeout), do: DateTime.utc_now() |> DateTime.add(timeout, :millisecond)

  defp do_await_common(
         session,
         expected_signal,
         callback,
         timeout,
         msg_ref
       ) do
    remaining_time = max(0, get_remaining_time(session.timeout_at))

    receive do
      {:chaperon_signal, ^expected_signal} ->
        callback.(session, expected_signal)

      {:chaperon_signal, signal}
      when expected_signal == :__no_expected_signal__ ->
        callback.(session, signal)

      {:chaperon_interval, ^msg_ref, fun} ->
        session
        |> fun.()
        |> do_await_common(expected_signal, callback, timeout, msg_ref)
    after
      remaining_time ->
        session |> error({:timeout, :await_signal, timeout})
    end
  end

  defp get_remaining_time(:infinity), do: :infinity

  defp get_remaining_time(timeout_at),
    do: DateTime.diff(timeout_at, DateTime.utc_now(), :millisecond)

  @doc """
  Removes a `Task` with a given `task_name` from `session`.
  """
  @spec remove_async_task(Session.t(), atom, Task.t()) :: Session.t()
  def remove_async_task(session, task_name, task) do
    case session.async_tasks[task_name] do
      nil ->
        session

      [^task] ->
        update_in(session.async_tasks, &Map.delete(&1, task_name))

      tasks when is_list(tasks) ->
        update_in(session.async_tasks[task_name], &List.delete(&1, task))
    end
  end

  @doc """
  Adds a given HTTP request `result` to `session` for the given `action`.
  """
  @spec add_result(Session.t(), Chaperon.Actionable.t(), any) :: Session.t()
  def add_result(session, action, result) do
    case session.config[:store_results] do
      true ->
        session
        |> log_debug("Add result #{action}")

        update_in(session.results[action], &[result | List.wrap(&1)])

      _ ->
        session
    end
  end

  @doc """
  Adds a given WebSocket action `result` to `session` for a given `action`.
  """
  @spec add_ws_result(Session.t(), Chaperon.Actionable.t(), any) :: Session.t()
  def add_ws_result(session, action, result) do
    case session.config[:store_results] do
      true ->
        session
        |> log_debug("Add WS result #{action} : #{inspect(result)}")

        update_in(session.results[action], &[result | List.wrap(&1)])

      _ ->
        session
    end
  end

  @doc """
  Stores a given metric `val` under a given `name` in `session`.
  """
  @spec add_metric(Session.t(), metric, any) :: Session.t()
  def add_metric(session, metric, val) do
    if session |> add_metric?(metric) do
      session
      |> log_debug("Add metric #{inspect(metric)} : #{val}")

      update_in(session.metrics[metric], &[val | List.wrap(&1)])
    else
      session
    end
  end

  defp add_metric?(session, metric) do
    case session.config[:metrics] do
      nil ->
        true

      f when is_function(f) ->
        f.(metric)

      types ->
        case metric do
          {metric_type, _} ->
            MapSet.member?(types, metric_type)

          metric_type ->
            MapSet.member?(types, metric_type)
        end
    end
  end

  @doc """
  Stores a `Chaperon.Session.Error` in `session` for a given `action` for later
  inspection.
  """
  @spec add_error(
          Session.t(),
          Chaperon.Actionable.t(),
          {:error, Error.t()}
        ) :: Session.t()
  def add_error(session, action, error) do
    put_in(session.errors[action], error)
  end

  @doc """
  Stores HTTP response cookies in `session` cookie store for further outgoing
  requests.
  """
  @spec store_response_cookies(Session.t(), HTTPoison.Response.t()) :: Session.t()
  def store_response_cookies(session, response = %HTTPoison.Response{}) do
    response
    |> response_cookies()
    |> strip_cookie_attributes()
    |> store_cookies(session)
  end

  def response_cookies(response = %HTTPoison.Response{}) do
    response.headers
    |> Enum.filter(fn {key, _} -> String.match?(key, ~r/\Aset-cookie\z/i) end)
    |> Enum.map(fn {_, value} -> value end)
  end

  # Strips attributes like Expires and HttpOnly from cookies. Only the name and
  # value are allowed when sending cookies in requests.
  defp strip_cookie_attributes(cookies) do
    cookies
    |> Enum.map(fn value ->
      String.replace(value, ~r/;.*$/, "", global: false)
    end)
  end

  defp store_cookies([], session) do
    # do nothing
    session
  end

  defp store_cookies(cookies, session) when is_list(cookies) do
    put_in(session.cookies, [cookies |> Enum.join("; ")])
  end

  @doc """
  Deletes all cookies from `session`'s cookie store.

      iex> session = %Chaperon.Session{cookies: ["cookie_val1", "cookie_val2"]}
      iex> session = session |> Chaperon.Session.delete_cookies
      iex> session.cookies
      []
  """
  @spec delete_cookies(Session.t()) :: Session.t()
  def delete_cookies(session) do
    put_in(session.cookies, [])
  end

  @spec async_results(Session.t(), atom) :: map
  defp async_results(task_session, task_name) do
    for {k, v} <- task_session.results do
      {task_name, {:async, k, v}}
    end
    |> Enum.into(%{})
  end

  @spec async_metrics(Session.t(), atom) :: map
  defp async_metrics(task_session, task_name) do
    for {k, v} <- task_session.metrics do
      {task_name, {:async, k, v}}
    end
    |> Enum.into(%{})
  end

  @spec merge_async_task_result(Session.t(), Session.t(), atom) :: Session.t()
  defp merge_async_task_result(session, task_session, _task_name) do
    session
    |> merge_results(task_session.results)
    |> merge_metrics(task_session.metrics)
    |> merge_errors(task_session.errors)
  end

  @doc """
  Merges two session's results & metrics and returns the resulting session.
  """
  @spec merge(Session.t(), Session.t()) :: Session.t()
  def merge(session, other_session) do
    session
    |> merge_results(other_session |> session_results)
    |> merge_metrics(other_session |> session_metrics)
    |> merge_errors(other_session |> session_errors)
  end

  @doc """
  Merges results of two sessions.
  """
  @spec merge_results(Session.t(), map) :: Session.t()
  def merge_results(session, results) do
    update_in(session.results, &preserve_vals_merge(&1, results))
  end

  @doc """
  Merges metrics of two sessions.
  """
  @spec merge_metrics(Session.t(), map) :: Session.t()
  def merge_metrics(session, metrics) do
    update_in(session.metrics, &preserve_vals_merge(&1, metrics))
  end

  @doc """
  Merges errors of two sessions.
  """
  def merge_errors(session, errors) do
    update_in(session.errors, &preserve_vals_merge(&1, errors))
  end

  @doc """
  Returns `session`'s results wrapped with `session`'s name.
  """
  def session_results(session) do
    session.results
    |> map_nested_put(:session_name, session |> name)
  end

  @doc """
  Returns `session`'s metrics wrapped with `session`'s name.
  """
  def session_metrics(session) do
    session.metrics
    |> map_nested_put(:session_name, session |> name)
  end

  @doc """
  Returns `session`'s errors wrapped with `session`'s name.
  """
  def session_errors(session) do
    session.errors
    |> map_nested_put(:session_name, session |> name)
  end

  @doc """
  Returns the `session`s configured name or scenario's module name.
  """
  def name(session) do
    name = session.config[:session_name] || session.name
    "#{session.id} #{name}"
  end

  @doc """
  Returns `{:ok, reason}`.
  """
  @spec ok(Session.t()) :: {:ok, Session.t()}
  def ok(session), do: {:ok, session}

  @doc """
  Returns a `Chaperon.Session.Error` for the given `session` and with a given
  `reason`.
  """
  @spec error(Session.t(), any) :: {:error, Error.t()}
  def error(session, reason) do
    {:error, %Error{reason: reason, session: session}}
  end

  @doc """
  Returns a `Chaperon.Session.Error` for the given `session` and with a given
  `reason` for a given `action`.
  """
  # @spec error(Session.t(), any, any) :: {:error, Error.t()}
  def error(session, reason, action) do
    {:error, %Chaperon.Action.Error{reason: reason, action: action, session: session}}
  end

  @doc """
  Runs a potentially configured callback for a given action in case of success.
  In case of failure, runs the configured error callback with an
  `{:error, reason}` tuple.

  For more info have a look at `Chaperon.Action.callback/1` and
  `Chaperon.Action.error_callback/1`.
  """
  def run_callback(session, %{callback: nil}, _), do: session

  def run_callback(session, action = %{decode: _decode_options}, response) do
    cb = Chaperon.Action.callback(action)
    error_cb = Chaperon.Action.error_callback(action)

    case decode_response(action, response) do
      {:ok, result} ->
        session
        |> call_callback(cb, result)

      err ->
        error =
          session
          |> error("Response (#{inspect(response)}) decoding failed: #{inspect(err)}")

        session
        |> add_error(action, error)
        |> call_callback(error_cb, err)
    end
  end

  def run_callback(session, action, response) do
    session
    |> call_callback(Chaperon.Action.callback(action), response)
  end

  @doc """
  Calls a `callback` with `session` and an additional argument.

  If the given callback is nil, simply returns `session`.
  If the callback is a function, call it with `session` and the extra argument.
  If it's an atom, call the function with that name in `session`'s currently
  running scenario module.
  """
  @spec call_callback(Session.t(), result_callback, any) :: Session.t()
  def call_callback(session, nil, _), do: session

  def call_callback(session, func_name, arg) when is_atom(func_name),
    do: apply(session.scenario.module, func_name, [session, arg])

  def call_callback(session, cb, arg) when is_function(cb), do: cb.(session, arg)

  def run_error_callback(session, action, response) do
    session
    |> call_callback(Chaperon.Action.error_callback(action), response)
  end

  defp maybe_spawn_interval_task(session) do
    task =
      case session.config[:interval] do
        {interval, fun} ->
          Task.async(fn -> session |> interval_fun(interval, fun) end)

        nil ->
          nil
      end

    %{session | interval_task: task}
  end

  defp interval_fun(session, interval, fun) do
    receive do
      :end_interval_task ->
        session
    after
      interval ->
        session
        |> fun.()
        |> interval_fun(interval, fun)
    end
  end

  defp maybe_end_interval_task(session = %{interval_task: nil}), do: session

  defp maybe_end_interval_task(session = %{interval_task: task}) do
    send(task.pid, :end_interval_task)
    interval_session = Task.await(task)

    session
    |> merge_async_task_result(interval_session, nil)
    |> Map.put(:interval_task, nil)
  end

  defp decode_response(action, response) do
    response_body =
      case response do
        %HTTPoison.Response{body: body} ->
          body

        s when is_binary(s) ->
          s
      end

    case action.decode do
      nil ->
        {:ok, response}

      :json ->
        Poison.decode(response_body, keys: :atoms)

      decode when is_function(decode) ->
        decode.(response_body)
    end
  end

  def reset_action_metadata(session) do
    %{session | metrics: %{}, results: %{}, errors: %{}, async_tasks: %{}}
  end

  @doc """
  Records a custom metric for the duration of calling a given function with the
  current `Chaperon.Session`.

  Example:

      # records a metric named :my_metric with the duration of calling
      # the given function in the module with the given args.
      session
      |> time(:my_action, MyModule, :my_func, [arg1, arg2])

      # this would record the duration of calling:
      # MyModule.my_func(session, arg1, arg2)
  """
  @spec time(Session.t(), metric, atom, atom, [any]) :: Session.t()
  def time(session, metric, module, func, args \\ []) do
    start = timestamp()
    session = apply(module, func, [session | args])

    session
    |> add_metric(metric, timestamp() - start)
  end

  @doc """
  Records a custom metric for the duration of calling a given function with the
  current `Chaperon.Session`.

  Example:

      # records a metric named :my_metric with the duration of calling
      # the given function
      session
      |> time(:my_action, fn session ->
        # do stuff with session
        # and at the end return session from inside this function
      end)
  """
  @spec time(Session.t(), metric, (Session.t() -> Session.t())) :: Session.t()
  def time(session, metric, func) do
    start = timestamp()

    session =
      case func do
        f when is_atom(f) ->
          apply(session.scenario.module, f, [session])

        f when is_function(f) ->
          f.(session)
      end

    session
    |> add_metric(metric, timestamp() - start)
  end

  @spec abort(Session.t(), String.t()) :: Session.t()
  def abort(session, reason) when is_binary(reason) do
    %{session | cancellation: reason}
  end

  def abort(session, reason) do
    session
    |> abort(inspect(reason))
  end

  @doc """
  Makes a given function call async for `session`.

  ## Example

      session
      ~> foo
      ~> bar(1,2,3)

  Is the same as:

      session
      |> async(:foo)
      |> async(:bar, [1,2,3])
  """
  defmacro session ~> {func, _, nil} do
    quote do
      unquote(session)
      |> Chaperon.Session.async(unquote(func))
    end
  end

  defmacro session ~> {task_name, _, args} do
    quote do
      unquote(session)
      |> Chaperon.Session.async(unquote(task_name), unquote(args))
    end
  end

  @doc """
  Awaits a given async task within `session`.

  ## Example

      session
      <~ foo
      <~ bar

  Is the same as:

      session
      |> await(:foo)
      |> await(:bar)
  """
  defmacro session <~ {task_name, _, _} do
    quote do
      unquote(session)
      |> Chaperon.Session.await(unquote(task_name))
    end
  end

  @doc """
  Wraps a function call with `session` as an arg in a call to
  `Chaperon.Session.call_traced` and captures function call duration metrics in
  `session`.

  ## Example

      session
      >>> foo
      >>> bar(1,2,3)

  Is the same as:

      session
      |> call_traced(:foo)
      |> call_traced(:bar, [1,2,3])
  """
  defmacro session >>> {func, _, nil} do
    quote do
      unquote(session)
      |> Chaperon.Session.call_traced(unquote(func))
    end
  end

  defmacro session >>> {func, _, args} do
    quote do
      unquote(session)
      |> Chaperon.Session.call_traced(unquote(func), unquote(args))
    end
  end

  def id_string(%Chaperon.Session{id: id, parent_id: nil}) do
    id
  end

  def id_string(%Chaperon.Session{id: id, parent_id: parent_id}) do
    id <> " [" <> parent_id <> "]"
  end
end

defimpl String.Chars, for: Chaperon.Session do
  def to_string(session) do
    case session.scenario do
      %Chaperon.Scenario{module: scenario_mod} ->
        case session.parent_id do
          nil ->
            "Session{id: #{inspect(session.id)}, scenario: #{inspect(scenario_mod)}}"

          parent_id ->
            "Session{id: #{inspect(session.id)}, scenario: #{inspect(scenario_mod)}, parent_id: #{
              inspect(parent_id)
            }}"
        end

      nil ->
        "Session{id: #{inspect(session.id)}}"
    end
  end
end