lib/glific/processor/consumer_flow.ex

defmodule Glific.Processor.ConsumerFlow do
  @moduledoc """
  Given a message, run it thru the flow engine. This is an auxilary module to help
  consumer_worker which is the main workhorse
  """

  import Ecto.Query, warn: false

  alias Glific.{
    Contacts.Contact,
    Flows,
    Flows.FlowContext,
    Flows.Periodic,
    Messages,
    Messages.Message,
    Partners
  }

  @doc """
  Load the relevant state into the gen_server state object that we need
  to process messages
  """
  @spec load_state(non_neg_integer) :: map()
  def load_state(organization_id) do
    %{
      flow_keywords: Flows.flow_keywords_map(organization_id),
      regx_flow: Partners.organization(organization_id).regx_flow
    }
  end

  @doc false
  @spec process_message({Message.t(), map()}, String.t()) :: {Message.t(), map()}
  def process_message({message, state}, body) do
    # check if draft keyword, if so bypass ignore keywords
    # and start draft flow, issue #621
    is_draft = is_draft_keyword?(state, body)

    if is_draft,
      do:
        FlowContext.mark_flows_complete(message.contact_id, false,
          source: "process_message",
          event_meta: %{
            is_draft: is_draft,
            body: body
          }
        )

    context = FlowContext.active_context(message.contact_id)

    # if contact is not optout if we are in a flow and the flow is set to ignore keywords
    # then send control to the flow directly
    # context is not nil

    if start_optin_flow?(message.contact, context, body),
      do: start_optin_flow(message, state),
      else: move_forward({message, state}, body, context, is_draft: is_draft)
  end

  # Setting this to 0 since we are pushing out our own optin flow
  @delay_time 0
  @draft_phrase "draft"
  @final_phrase "published"
  @optin_flow_keyword "optin"

  @doc """
  In case contact is not in optin flow let's move ahead with the regular processing.
  """
  @spec move_forward({Message.t(), map()}, String.t(), FlowContext.t(), Keyword.t()) ::
          {Message.t(), map()}
  def move_forward({message, state}, body, context, opts) do
    cond do
      continue_the_context?(context) ->
        continue_current_context(context, message, body, state)

      start_new_contact_flow?(state) ->
        flow_id = state.flow_keywords["org_default_new_contact"]
        flow_params = {:flow_id, flow_id, @final_phrase}
        start_new_flow(message, body, state, delay: @delay_time, flow_params: flow_params)

      is_flow_keyword?(state, body) ->
        flow_params = {:flow_keyword, body, @final_phrase}
        start_new_flow(message, body, state, flow_params: flow_params)

      Keyword.get(opts, :is_draft, false) ->
        body = String.replace_leading(message.body, @draft_phrase <> ":", "")
        flow_params = {:flow_keyword, body, @draft_phrase}
        opts = [status: @draft_phrase, flow_params: flow_params]
        start_new_flow(message, message.body, state, opts)

      # making sure that user is not in any flow.
      is_context_nil?(context) && match_with_regex?(state.regx_flow, message.body) ->
        flow_id = Glific.parse_maybe_integer!(state.regx_flow.flow_id)
        flow_params = {:flow_id, flow_id, @final_phrase}
        start_new_flow(message, body, state, delay: @delay_time, flow_params: flow_params)

      is_context_nil?(context) ->
        state = Periodic.run_flows(state, message)
        {message, state}

      true ->
        continue_current_context(context, message, body, state)
    end
  end

  @doc """
  Start a flow or reactivate a flow if needed. This will be linked to the entire
  trigger mechanism once we have that under control.
  """
  @spec start_new_flow(atom() | Message.t(), String.t(), map(), Keyword.t()) ::
          {Message.t(), map()}
  def start_new_flow(message, _body, state, opts \\ []) do
    flow_params = Keyword.get(opts, :flow_params, nil)
    status = Keyword.get(opts, :status, @final_phrase)

    Flows.get_cached_flow(message.organization_id, flow_params)
    |> case do
      {:ok, flow} ->
        opts = Keyword.put(opts, :flow_keyword, message.body)
        FlowContext.init_context(flow, message.contact, status, opts)

      {:error, _} ->
        nil
    end

    {message, state}
  end

  @doc false
  @spec continue_current_context(
          FlowContext.t() | nil,
          atom() | Message.t(),
          String.t(),
          map()
        ) ::
          {Message.t(), map()}
  def continue_current_context(context, message, _body, state) do
    {:ok, flow} =
      Flows.get_cached_flow(
        message.organization_id,
        {:flow_uuid, context.flow_uuid, context.status}
      )

    {:ok, message} =
      message
      |> Messages.update_message(%{flow_id: context.flow_id})

    context
    |> Map.merge(%{last_message: message})
    |> FlowContext.load_context(flow)
    # we are using message.body here since we want to use the original message
    # not the stripped version
    # I'm not sure why we are creating a message here instead of reusing the existing
    # message. We'll switch this to using message in the next release (1.0.1)
    |> FlowContext.step_forward(
      Messages.create_temp_message(
        message.organization_id,
        message.body,
        type: message.type,
        id: message.id,
        media: message.media,
        media_id: message.media_id,
        location: message.location,
        interactive_content: message.interactive_content
      )
    )

    {message, state}
  end

  @spec is_draft_keyword?(map(), String.t()) :: boolean()
  defp is_draft_keyword?(_state, nil), do: false

  defp is_draft_keyword?(state, body) do
    if String.starts_with?(body, @draft_phrase) and
         Map.has_key?(
           state.flow_keywords["draft"],
           String.replace_leading(body, @draft_phrase, "")
         ),
       do: true,
       else: false
  end

  ## check if contact is not in the optin flow and has optout time
  @spec start_optin_flow?(Contact.t(), FlowContext.t() | nil, String.t()) :: boolean()
  defp start_optin_flow?(contact, nil, _body),
    do: !is_nil(contact.optout_time)

  defp start_optin_flow?(contact, active_context, body),
    do:
      if(Flows.is_optin_flow?(active_context.flow),
        do: false,
        else: start_optin_flow?(contact, nil, body)
      )

  @spec start_optin_flow(Message.t(), map()) :: {Message.t(), map()}
  defp start_optin_flow(message, state) do
    ## remove all the previous flow context
    FlowContext.mark_flows_complete(message.contact_id, false,
      source: "start_optin_flow",
      event_meta: %{
        message_id: message.id
      }
    )

    Flows.get_cached_flow(
      message.organization_id,
      {:flow_keyword, @optin_flow_keyword, @final_phrase}
    )
    |> case do
      {:ok, flow} ->
        FlowContext.init_context(flow, message.contact, @final_phrase, is_draft: false)

      {:error, _} ->
        nil
    end

    {message, state}
  end

  @spec start_new_contact_flow?(map()) :: boolean()
  defp start_new_contact_flow?(state) do
    Map.get(state, :newcontact, false) && !is_nil(state.flow_keywords["org_default_new_contact"])
  end

  @spec is_flow_keyword?(map(), String.t()) :: boolean()
  defp is_flow_keyword?(state, body) do
    Map.has_key?(state.flow_keywords["published"], body)
  end

  @spec match_with_regex?(map(), String.t()) :: boolean()
  defp match_with_regex?(regx_flow, body) when nil in [regx_flow, body], do: false

  defp match_with_regex?(regx_flow, body) when is_map(regx_flow) == true do
    Regex.compile(regx_flow.regx, regx_flow.regx_opt)
    |> case do
      {:ok, rgx} -> String.match?(body, rgx)
      _ -> false
    end
  end

  defp match_with_regex?(_, _), do: false

  @spec continue_the_context?(FlowContext.t()) :: boolean()
  defp continue_the_context?(context) do
    cond do
      is_nil(context) -> false
      context.flow.ignore_keywords -> true
      true -> false
    end
  end

  @spec is_context_nil?(FlowContext.t() | nil) :: boolean()
  defp is_context_nil?(context) do
    ## not sure why this is giving dialyzer error. Ignoring for now
    is_nil(context)
  end
end