lib/glific/flows/flow_context.ex

defmodule Glific.Flows.FlowContext do
  @moduledoc """
  When we are running a flow, we are running it in the context of a
  contact and/or a conversation (or other Glific data types). Let encapsulate
  this in a module and isolate the flow from the other aspects of Glific
  """
  alias __MODULE__

  use Ecto.Schema
  import Ecto.Changeset
  import Ecto.Query, warn: false
  import GlificWeb.Gettext
  require Logger

  alias Glific.{
    Contacts,
    Contacts.Contact,
    Flows,
    Flows.Flow,
    Flows.FlowResult,
    Flows.MessageBroadcast,
    Flows.MessageVarParser,
    Flows.Node,
    Messages,
    Messages.Message,
    Notifications,
    Partners.Organization,
    Profiles.Profile,
    Repo
  }

  @required_fields [:contact_id, :flow_id, :flow_uuid, :status, :organization_id]
  @optional_fields [
    :node_uuid,
    :parent_id,
    :results,
    :wakeup_at,
    :is_background_flow,
    :is_await_result,
    :is_killed,
    :completed_at,
    :delay,
    :uuids_seen,
    :uuid_map,
    :recent_inbound,
    :recent_outbound,
    :message_broadcast_id,
    :profile_id
  ]

  # we store one more than the number of messages specified here
  @max_message_len 11

  @type t :: %__MODULE__{
          __meta__: Ecto.Schema.Metadata.t(),
          uuid_map: map() | nil,
          results: map() | nil,
          contact_id: non_neg_integer | nil,
          contact: Contact.t() | Ecto.Association.NotLoaded.t() | nil,
          last_message: Message.t() | nil,
          flow_id: non_neg_integer | nil,
          flow_uuid: Ecto.UUID.t() | nil,
          flow: Flow.t() | Ecto.Association.NotLoaded.t() | nil,
          organization_id: non_neg_integer | nil,
          organization: Organization.t() | Ecto.Association.NotLoaded.t() | nil,
          status: String.t() | nil,
          parent_id: non_neg_integer | nil,
          parent: FlowContext.t() | Ecto.Association.NotLoaded.t() | nil,
          message_broadcast_id: non_neg_integer | nil,
          message_broadcast: Message.t() | Ecto.Association.NotLoaded.t() | nil,
          profile_id: non_neg_integer | nil,
          profile: Profile.t() | Ecto.Association.NotLoaded.t() | nil,
          node_uuid: Ecto.UUID.t() | nil,
          node: Node.t() | nil,
          delay: integer,
          uuids_seen: map(),
          recent_inbound: [map()] | [],
          recent_outbound: [map()] | [],
          wakeup_at: :utc_datetime | nil,
          is_background_flow: boolean,
          is_await_result: boolean,
          is_killed: boolean,
          completed_at: :utc_datetime | nil,
          inserted_at: :utc_datetime | nil,
          updated_at: :utc_datetime | nil
        }

  schema "flow_contexts" do
    field(:uuid_map, :map, virtual: true)
    field(:node, :map, virtual: true)

    field(:results, :map, default: %{})

    field(:node_uuid, Ecto.UUID)
    field(:flow_uuid, Ecto.UUID)

    field(:status, :string, default: "published")

    field(:wakeup_at, :utc_datetime, default: nil)
    field(:completed_at, :utc_datetime, default: nil)

    field(:is_background_flow, :boolean, default: false)
    field(:is_await_result, :boolean, default: false)
    field(:is_killed, :boolean, default: false)

    field(:delay, :integer, default: 0, virtual: true)

    # keep a map of all uuids we encounter (start with flows)
    # this allows to to detect infinite loops and abort
    field(:uuids_seen, :map, default: %{}, virtual: true)

    field(:recent_inbound, {:array, :map}, default: [])
    field(:recent_outbound, {:array, :map}, default: [])

    field(:last_message, :map, virtual: true)

    belongs_to(:contact, Contact)
    belongs_to(:flow, Flow)
    belongs_to(:organization, Organization)
    belongs_to(:parent, FlowContext, foreign_key: :parent_id)
    belongs_to(:profile, Profile)
    # the originating group message which kicked off this flow if any
    belongs_to(:message_broadcast, MessageBroadcast)

    timestamps(type: :utc_datetime)
  end

  @doc """
  Standard changeset pattern we use for all data types
  """
  @spec changeset(FlowContext.t(), map()) :: Ecto.Changeset.t()
  def changeset(context, attrs) do
    context
    |> cast(attrs, @required_fields ++ @optional_fields)
    |> validate_required(@required_fields)
    |> foreign_key_constraint(:contact_id)
    |> foreign_key_constraint(:flow_id)
    |> foreign_key_constraint(:parent_id)
  end

  @doc """
  Create a FlowContext
  """
  @spec create_flow_context(map()) :: {:ok, FlowContext.t()} | {:error, Ecto.Changeset.t()}
  def create_flow_context(attrs \\ %{}) do
    %FlowContext{}
    |> FlowContext.changeset(attrs)
    |> Repo.insert()
  end

  @doc false
  @spec update_flow_context(FlowContext.t(), map()) ::
          {:ok, FlowContext.t()} | {:error, Ecto.Changeset.t()}
  def update_flow_context(context, attrs) do
    context
    |> FlowContext.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Generate a notification having all the flow context data.
  """
  @spec notification(FlowContext.t(), String.t()) :: nil
  def notification(context, message) do
    context = Repo.preload(context, [:flow])
    Logger.info(message)

    {:ok, _} =
      Notifications.create_notification(%{
        category: "Flow",
        message: message,
        severity: Notifications.types().warning,
        organization_id: context.organization_id,
        entity: %{
          contact_id: context.contact_id,
          flow_id: context.flow_id,
          flow_uuid: context.flow.uuid,
          parent_id: context.parent_id,
          name: context.flow.name
        }
      })

    nil
  end

  @doc """
  Resets all the context for the user when we hit an error. This can potentially
  prevent an infinite loop from happening if flows are connected in a cycle
  """
  @spec reset_all_contexts(FlowContext.t(), String.t()) :: FlowContext.t() | nil
  def reset_all_contexts(context, message) do
    # lets skip logging and notifications for things that occur quite often
    if !Glific.ignore_error?(message) do
      Logger.info(message)
      notification(context, message)
    end

    # lets reset the entire flow tree complete if this context is a child
    if context.parent_id,
      do:
        mark_flows_complete(context.contact_id, false,
          source: "reset_all_contexts",
          event_meta: %{
            context_id: context.id,
            message: message
          }
        )

    # lets reset the current context and return the reset context
    reset_one_context(context,
      is_killed: true,
      source: "reset_all_contexts",
      event_meta: %{
        message: message
      }
    )
  end

  @doc """
  Reset this context, but don't follow parent context tail. This is used
  for tail call optimization
  """
  @spec reset_one_context(FlowContext.t(), Keyword.t()) :: FlowContext.t()
  def reset_one_context(context, opts \\ []) do
    is_killed = Keyword.get(opts, :is_killed, false)
    message = get_in(opts, [:event_meta, :message])
    parent_id = get_in(opts, [:event_meta, :parent_id])
    source = Keyword.get(opts, :source, "")

    {:ok, context} =
      update_flow_context(
        context,
        %{
          completed_at: DateTime.utc_now(),
          is_killed: is_killed
        }
      )

    :telemetry.execute(
      [:glific, :flow, :stop],
      %{},
      %{
        id: context.flow_id,
        context_id: context.id,
        contact_id: context.contact_id,
        organization_id: context.organization_id
      }
    )

    context = Repo.preload(context, [:flow, :contact])

    event_label =
      cond do
        !is_nil(message) && source == "reset_all_contexts" ->
          "Flow terminated abruptly"

        !is_nil(parent_id) ->
          "Child Flow Completed"

        true ->
          "Flow Completed"
      end

    {:ok, _} =
      Contacts.capture_history(context.contact, :contact_flow_ended, %{
        event_label: event_label,
        event_meta:
          %{
            context_id: context.id,
            source: Keyword.get(opts, :source, ""),
            flow: %{
              id: context.flow.id,
              name: context.flow.name,
              uuid: context.flow.uuid
            }
          }
          |> Map.merge(Keyword.get(opts, :event_meta, %{}))
      })

    context
  end

  @doc """
  Resets the context and sends control back to the parent context
  if one exists
  """
  @min_delay 2

  @spec reset_context(FlowContext.t()) :: FlowContext.t() | nil
  def reset_context(context) do
    Logger.info("Ending Flow: id: '#{context.flow_id}', contact_id: '#{context.contact_id}'")

    # we first update this entry with the completed at time
    context =
      reset_one_context(context,
        source: "reset_context",
        event_meta: %{
          parent_id: context.parent_id
        }
      )

    # check if context has a parent_id, if so, we need to
    # load that context and keep going
    if context.parent_id do
      # we load the parent context, and resume it with a message of "Completed"
      parent = active_context(context.contact_id, context.parent_id)

      # ensure the parent is still active. If the parent completed (or was terminated)
      # we don't get back a valid parent
      if parent do
        Logger.info(
          "Resuming Parent Flow: id: '#{parent.flow_id}', contact_id: '#{context.contact_id}'"
        )

        ## add delay so that it does not execute the message before sub flows
        ## adding this line separately so that we can easily identify this in different cases.

        parent = Map.put(parent, :delay, max(context.delay + @min_delay, @min_delay))

        parent
        |> load_context(Flow.get_flow(context.organization_id, parent.flow_uuid, context.status))
        |> merge_child_results(context)
        |> step_forward(Messages.create_temp_message(context.organization_id, "completed"))
      end
    end

    # return the original context, which is now completed
    context
  end

  @spec merge_child_results(FlowContext.t(), FlowContext.t()) :: FlowContext.t()
  defp merge_child_results(parent, child) do
    # merge the child results into parent
    # but lets remove the parent result field to keep the map simple
    child_results = Map.delete(child.results, "parent")

    if child_results == %{} do
      parent
    else
      child_results = %{
        # for now commenting this out since folks have complex flows
        # "child #{child.flow_id}" => child_results,
        "child" => child_results
      }

      update_results(parent, child_results)
    end
  end

  @doc """
  Update the recent_* state as we consume or send a message
  """
  @spec update_recent(FlowContext.t(), map(), atom()) ::
          FlowContext.t()
  def update_recent(context, msg, type) do
    now = DateTime.utc_now()

    # since we are storing in DB and want to avoid hassle of atom <-> string conversion
    # we'll always use strings as keys

    messages =
      [
        %{
          "contact" => %{
            uuid: context.contact_id,
            name: context.contact.name
          },
          "message" => msg.body,
          "message_id" => msg.id,
          "date" => now,
          "node_uuid" => context.node_uuid
        }
        | Map.get(context, type)
      ]
      |> Enum.slice(0..@max_message_len)

    # since we have recd a message, we also ensure that we are not going to be woken
    # up by a timer if present.
    {:ok, context} =
      update_flow_context(context, %{type => messages, wakeup_at: nil, is_background_flow: false})

    context
  end

  @doc """
  Update the contact results with each element of the json map
  """
  @spec update_results(FlowContext.t(), map() | nil) :: FlowContext.t()
  def update_results(context, result) do
    results =
      if context.results == %{} || is_nil(context.results),
        do: result,
        else: Map.merge(context.results, result)

    {:ok, context} = update_flow_context(context, %{results: results})

    args = %{
      results: results,
      contact_id: context.contact_id,
      flow_id: context.flow_id,
      flow_version: context.flow.version,
      flow_context_id: context.id,
      flow_uuid: context.flow_uuid,
      organization_id: context.organization_id
    }

    # we try the upsert twice in case the first one conflicts with another
    # simultaneous insert. Happens rarely but a couple of times.
    case FlowResult.upsert_flow_result(args) do
      {:ok, flow_result} -> flow_result
      {:error, _} -> FlowResult.upsert_flow_result(args)
    end

    context
  end

  @spec get_datetime(map()) :: DateTime.t()
  defp get_datetime(item) do
    # sometime we get this from memory, and its not retrieved from DB
    # in which case its already in a valid date format
    if is_binary(item["date"]) do
      {:ok, date, _} = DateTime.from_iso8601(item["date"])
      date
    else
      item["date"]
    end
  end

  @doc """
  Count the number of times we have sent the same message in the recent past
  """
  @spec match_outbound(FlowContext.t(), String.t(), integer) :: integer
  def match_outbound(context, _body, go_back \\ 6) do
    since = Glific.go_back_time(go_back)

    Enum.filter(
      context.recent_outbound,
      fn item ->
        date = get_datetime(item)

        # comparing node uuids is a lot more powerful than comparing message body
        item["node_uuid"] == context.node_uuid and
          DateTime.compare(date, since) in [:gt, :eq]
      end
    )
    |> length()
  end

  @doc """
  Set the new node for the context
  """
  @spec set_node(FlowContext.t(), Node.t()) :: FlowContext.t()
  def set_node(context, node) do
    {:ok, context} = update_flow_context(context, %{node_uuid: node.uuid})
    %{context | node: node}
  end

  @doc """
  Execute one (or more) steps in a flow based on the message stream
  """
  @spec execute(FlowContext.t(), [Message.t()]) ::
          {:ok | :wait, FlowContext.t(), [Message.t()]} | {:error, String.t()}
  def execute(%FlowContext{node: node} = _context, _messages) when is_nil(node),
    do: {:error, dgettext("errors", "We have finished the flow")}

  def execute(context, messages) do
    case Node.execute(context.node, context, messages) do
      {:ok, context, []} ->
        {:ok, context, []}

      {:wait, context, messages} ->
        {:wait, context, messages}

      # Routers basically break the processing, and return back to the top level
      # and hence we hit this case. Since they can be multiple routers stacked (e.g. when
      # the flow has multiple webhooks in it), we recurse till we no longer change state
      {:ok, context, new_messages} ->
        # if we've consumed some messages, lets continue calling the function,
        # till we consume all messages that we potentially can
        if messages != new_messages do
          execute(context, new_messages)
        else
          # lets discard the message stream and go forward
          {:ok, context, []}
        end

      others ->
        others
    end
  end

  # this marks complete all the context which are newer than date
  # this is used when a background flow  wakes up, and it has no
  # idea what happened it was sleeping
  @spec add_date_clause(Ecto.Query.t(), DateTime.t() | nil) :: Ecto.Query.t()
  defp add_date_clause(query, nil), do: query

  defp add_date_clause(query, after_insert),
    do: query |> where([fc], fc.inserted_at > ^after_insert)

  @doc """
  Set all the flows for a specific context to be completed
  """
  @spec mark_flows_complete(non_neg_integer, boolean(), Keyword.t()) :: nil
  def mark_flows_complete(_contact_id, _is_background_flow, opts \\ [])
  def mark_flows_complete(_contact_id, true, _opts), do: nil

  def mark_flows_complete(contact_id, false, opts) do
    after_insert_date = Keyword.get(opts, :after_insert_date, nil)
    source = Keyword.get(opts, :source, "")

    now = DateTime.utc_now()

    FlowContext
    |> where([fc], fc.contact_id == ^contact_id)
    |> where([fc], is_nil(fc.completed_at))
    |> add_date_clause(after_insert_date)
    # lets not touch the contexts which are waiting to be woken up at a specific time
    |> where([fc], fc.is_background_flow == false)
    |> Repo.update_all(set: [completed_at: now, updated_at: now, is_killed: true])

    event_label =
      cond do
        source == "terminate_contact_flows" ->
          "Last Active flow is terminated"

        is_nil(after_insert_date) && source == "init_context" ->
          "Last Active flow is killed as new flow has started"

        source == "wakeup_one" ->
          "Flow waked up, marking all other flows as completed"

        true ->
          "Mark all the flow as completed."
      end

    {:ok, _} =
      Contacts.capture_history(contact_id, :contact_flow_ended_all, %{
        event_label: event_label,
        event_meta:
          %{
            after_insert_date: after_insert_date,
            source: Keyword.get(opts, :source, "")
          }
          |> Map.merge(Keyword.get(opts, :event_meta, %{}))
      })

    :telemetry.execute(
      [:glific, :flow, :stop_all],
      %{},
      %{
        contact_id: contact_id,
        organization_id: Repo.get_organization_id()
      }
    )
  end

  ## If flow starts with a keyword then add the keyword to the context results
  @spec default_results(Keyword.t()) :: map()
  defp default_results(opts) do
    flow_keyword = Keyword.get(opts, :flow_keyword, "")
    initial_results = Keyword.get(opts, :default_results, %{}) || %{}

    if flow_keyword in [nil, ""] do
      initial_results
    else
      %{
        "flow_keyword" => %{
          "input" => Glific.string_clean(flow_keyword),
          "category" => flow_keyword,
          "inserted_at" => DateTime.utc_now()
        }
      }
      |> Map.merge(initial_results)
    end
  end

  @doc """
  Seed the context and set the wake up time as needed
  """
  @spec seed_context(Flow.t(), Contact.t(), String.t(), Keyword.t()) ::
          {:ok, FlowContext.t()} | {:error, Ecto.Changeset.t()}
  def seed_context(flow, contact, status, opts \\ []) do
    parent_id = Keyword.get(opts, :parent_id)
    message_broadcast_id = Keyword.get(opts, :message_broadcast_id)
    delay = Keyword.get(opts, :delay, 0)
    uuids_seen = Keyword.get(opts, :uuids_seen, %{})
    wakeup_at = Keyword.get(opts, :wakeup_at)
    initial_results = Keyword.get(opts, :results, default_results(opts))

    Logger.info(
      "Seeding flow: id: '#{flow.id}', parent_id: '#{parent_id}', contact_id: '#{contact.id}'"
    )

    node = flow.start_node

    {:ok, context} =
      create_flow_context(%{
        contact_id: contact.id,
        parent_id: parent_id,
        message_broadcast_id: message_broadcast_id,
        node_uuid: node.uuid,
        flow_uuid: flow.uuid,
        status: status,
        node: node,
        flow_id: flow.id,
        flow: flow,
        organization_id: flow.organization_id,
        uuid_map: flow.uuid_map,
        delay: delay,
        uuids_seen: uuids_seen,
        wakeup_at: wakeup_at
      })

    context =
      if initial_results in [nil, %{}] do
        context
      else
        context
        |> Repo.preload([:flow, :contact])
        |> update_results(initial_results)
      end

    {:ok, context}
  end

  @doc """
  Start a new context, if there is an existing context, blow it away
  """
  @spec init_context(Flow.t(), Contact.t(), String.t(), Keyword.t() | []) ::
          {:ok | :wait, FlowContext.t(), [String.t()]} | {:error, String.t()}
  def init_context(flow, contact, status, opts \\ []) do
    parent_id = Keyword.get(opts, :parent_id)
    # set all previous context to be completed if we are not starting a sub flow
    if is_nil(parent_id) do
      mark_flows_complete(contact.id, flow.is_background,
        source: "init_context",
        event_meta: %{
          flow_id: flow.id,
          parent_id: parent_id,
          status: status
        }
      )
    end

    {:ok, context} = seed_context(flow, contact, status, opts)

    :telemetry.execute(
      [:glific, :flow, :start],
      %{duration: 1},
      %{
        id: flow.id,
        context_id: context.id,
        contact_id: contact.id,
        organization_id: context.organization_id
      }
    )

    {:ok, _} =
      Contacts.capture_history(contact, :contact_flow_started, %{
        event_label: "Flow Started",
        event_meta: %{
          context_id: context.id,
          flow: %{
            id: flow.id,
            uuid: flow.uuid,
            name: flow.name
          }
        }
      })

    context
    |> load_context(flow)
    # lets do the first steps and start executing it till we need a message
    |> execute([])
  end

  @doc """
  Check if there is an active context (i.e. with a non null, node_uuid for this contact)
  """
  @spec active_context(non_neg_integer, non_neg_integer | nil) :: FlowContext.t() | nil
  def active_context(contact_id, parent_id \\ nil) do
    # need to fix this instead of assuming the highest id is the most
    # active context (or is that a wrong assumption). Maybe a context number? like
    # we do for other tables
    # We should not wake up those contexts which are waiting on time
    query =
      from(fc in FlowContext,
        where:
          fc.contact_id == ^contact_id and
            not is_nil(fc.node_uuid) and
            is_nil(fc.completed_at) and
            fc.is_background_flow == false,
        order_by: [desc: fc.id],
        limit: 1
      )

    query =
      if parent_id,
        do: query |> where([fc], fc.id == ^parent_id),
        else: query

    # There are lot of test cases failing because of this change. Will come back to it end of this PR.
    fc =
      query
      |> Repo.one()
      |> Repo.preload([:contact, :flow])

    # if this is a background flow we skip it
    if fc && fc.is_background_flow,
      do: nil,
      else: fc
  end

  @doc """
  Load the context object, given a flow object and a contact. At some point,
  we'll get the genserver to cache this
  """
  @spec load_context(FlowContext.t(), Flow.t()) :: FlowContext.t()
  def load_context(context, flow) do
    case Map.fetch(flow.uuid_map, context.node_uuid) do
      {:ok, {:node, node}} ->
        context
        |> Repo.preload(:contact)
        |> Map.put(:flow, flow)
        |> Map.put(:uuid_map, flow.uuid_map)
        |> Map.put(:node, node)
        ## We will refactor it more and use it whenever we need this.
        ## Currently to restrict the number changes in the context
        |> set_last_message()

      :error ->
        # Seems like the flow changed underneath us
        # so this node no longer exists. Lets reset the context
        # and terminate the flow, which sets the context.node to nil
        # and hence does not execute
        Logger.error(
          "Seems like the flow: #{flow.id} changed underneath us for: #{context.organization_id}"
        )

        reset_all_contexts(context, "A new flow was published, resetting flows for this contact.")
    end
  end

  @doc """
  Given an input string, consume the input and advance the state of the context
  """
  @spec step_forward(FlowContext.t(), Message.t()) :: {:ok, map()} | {:error, String.t()}
  def step_forward(context, message) do
    case execute(context, [message]) do
      {:ok, context, []} ->
        {:ok, context}

      {:wait, context, _messages} ->
        {:ok, context}

      {:error, error} ->
        Glific.log_error(error)
    end
  end

  @wake_up_flow_limit 500

  @spec wakeup_flows(non_neg_integer) :: any
  @doc """
  Find all the contexts which need to be woken up and processed
  """
  def wakeup_flows(_organization_id) do
    FlowContext
    |> where([fc], not is_nil(fc.wakeup_at))
    |> where([fc], fc.wakeup_at < ^DateTime.utc_now())
    |> where([fc], is_nil(fc.completed_at))
    |> limit(@wake_up_flow_limit)
    |> preload(:flow)
    |> Repo.all()
    |> Enum.each(&wakeup_one(&1))
  end

  @doc """
  Process one context at a time that is ready to be woken
  """
  @spec wakeup_one(FlowContext.t(), Message.t() | nil) ::
          {:ok, FlowContext.t() | nil, [String.t()]} | {:error, String.t()} | nil
  def wakeup_one(context, message \\ nil) do
    # update the context woken up time as soon as possible to avoid someone else
    # grabbing this context
    {:ok, context} =
      update_flow_context(
        context,
        %{
          wakeup_at: nil,
          is_background_flow: false,
          is_await_result: false
        }
      )

    # also mark all newer contexts as completed
    mark_flows_complete(context.contact_id, context.flow.is_background,
      after_insert_date: context.inserted_at,
      source: "wakeup_one",
      event_meta: %{
        context_id: context.id,
        flow_id: context.flow_id,
        message: "#{inspect(message)}"
      }
    )

    {:ok, flow} =
      Flows.get_cached_flow(
        context.organization_id,
        {:flow_uuid, context.flow_uuid, context.status}
      )

    message =
      if is_nil(message),
        do: Messages.create_temp_message(context.organization_id, "No Response"),
        else: message

    context
    |> FlowContext.load_context(flow)
    |> FlowContext.step_forward(message)
    |> case do
      {:ok, context} -> {:ok, context, []}
      {:error, message} -> {:error, message}
    end
  end

  @spec await_context(non_neg_integer, non_neg_integer) :: FlowContext.t() | nil
  defp await_context(contact_id, flow_id) do
    FlowContext
    |> where([fc], fc.contact_id == ^contact_id)
    |> where([fc], fc.flow_id == ^flow_id)
    |> where([fc], fc.is_await_result == true)
    |> where([fc], is_nil(fc.completed_at))
    |> preload(:flow)
    |> Repo.one()
  end

  @doc """
  Resume the flow for a given contact and a given flow id if still active
  """
  @spec resume_contact_flow(
          Contact.t(),
          non_neg_integer | FlowContext.t() | nil,
          map(),
          Message.t() | nil
        ) ::
          {:ok, FlowContext.t() | nil, [String.t()]} | {:error, String.t()} | nil
  def resume_contact_flow(contact, flow_id, result, message \\ nil)

  def resume_contact_flow(contact, flow_id, result, message) when is_integer(flow_id) do
    context = await_context(contact.id, flow_id)
    resume_contact_flow(contact, context, result, message)
  end

  def resume_contact_flow(contact, nil, _result, _message) do
    {:error, "#{contact.id} does not have any active flows awaiting results."}
  end

  def resume_contact_flow(_contact, context, result, message) do
    # first update the flow context with the result
    ## if user don't send any valid map results/params, we will set the result to nil

    result =
      if result in [[], nil],
        do: %{},
        else: result

    context = update_results(context, result)

    # and then proceed as if we are waking the flow up
    wakeup_one(context, message)
  end

  @doc """
  Delete all the contexts which are completed before two days
  """
  @spec delete_completed_flow_contexts(non_neg_integer) :: :ok
  def delete_completed_flow_contexts(back \\ 2) do
    back_date = DateTime.utc_now() |> DateTime.add(-1 * back * 24 * 60 * 60, :second)

    """
    DELETE FROM flow_contexts
    WHERE id = any (array(
       SELECT id
       FROM flow_contexts AS f0
       WHERE f0.completed_at < '#{back_date}' AND F0.completed_at IS NOT NULL LIMIT 500));
    """
    |> Repo.query!([], timeout: 60_000, skip_organization_id: true)

    Logger.info("Deleting flow contexts completed #{back} days back")

    :ok
  end

  @doc """
  Delete all the contexts which are older than 7 days
  """
  @spec delete_old_flow_contexts(non_neg_integer) :: :ok
  def delete_old_flow_contexts(back \\ 7) do
    deletion_date = DateTime.utc_now() |> DateTime.add(-1 * back * 24 * 60 * 60, :second)

    """
    DELETE FROM flow_contexts
    WHERE id = any (array(SELECT id FROM flow_contexts AS f0 WHERE f0.inserted_at < '#{deletion_date}' LIMIT 500));
    """
    |> Repo.query!([], timeout: 60_000, skip_organization_id: true)

    Logger.info("Deleting flow contexts older than #{back} days")

    :ok
  end

  @doc """
    A single place to parse the variable in a string related to flows.
  """
  @spec parse_context_string(FlowContext.t(), String.t()) :: String.t()
  def parse_context_string(context, str) do
    vars = get_vars_to_parse(context)
    MessageVarParser.parse(str, vars)
  end

  @doc """
    A single place to parse the variable in a string related to flows.
  """
  @spec get_vars_to_parse(FlowContext.t()) :: map()
  def get_vars_to_parse(context) do
    %{
      "results" => context.results,
      "contact" => Contacts.get_contact_field_map(context.contact_id),
      "flow" => %{name: context.flow.name, id: context.flow.id, uuid: context.flow.uuid}
    }
  end

  @spec set_last_message(FlowContext.t()) :: FlowContext.t()
  defp set_last_message(%{last_message: message} = context) when message not in [%{}, nil, ""],
    do: context

  defp set_last_message(context) do
    recent_inbounds = get_recent_inbounds(context)

    cond do
      recent_inbounds in [[], nil, %{}] ->
        context

      hd(recent_inbounds)["message_id"] == nil ->
        context

      true ->
        latest_inbound = hd(recent_inbounds)

        message =
          Messages.get_message!(latest_inbound["message_id"])
          |> Repo.preload(contact: [:language])

        Map.put(context, :last_message, message)
    end
  end

  @spec get_recent_inbounds(FlowContext.t()) :: list()
  defp get_recent_inbounds(context) do
    cond do
      context.recent_inbound not in [[], nil, %{}] ->
        context.recent_inbound

      is_nil(context.parent_id) ->
        context.recent_inbound

      true ->
        context = Repo.preload(context, :parent)
        get_recent_inbounds(context.parent)
    end
  end
end