lib/glific/flows/broadcast.ex

defmodule Glific.Flows.Broadcast do
  @moduledoc """
  Start a flow to a group so we can blast it out as soon as
  possible and ensure we are under the rate limits.
  """

  import Ecto.Query, warn: false

  require Logger

  alias Glific.{
    Contacts.Contact,
    Flows,
    Flows.Flow,
    Flows.FlowContext,
    Flows.MessageBroadcast,
    Flows.MessageBroadcastContact,
    Groups.Group,
    Messages,
    Partners,
    Repo
  }

  @status "published"

  @doc """
  The one simple public interface to broadcast a group
  """
  @spec broadcast_flow_to_group(Flow.t(), Group.t(), map()) ::
          {:ok, MessageBroadcast.t()} | {:error, String.t()}
  def broadcast_flow_to_group(flow, group, default_results \\ %{}) do
    # lets set up the state and then call our helper friend to split group into smaller chunks
    # of contacts
    {:ok, flow} = Flows.get_cached_flow(group.organization_id, {:flow_id, flow.id, @status})

    {:ok, group_message} =
      Messages.create_group_message(%{
        body: "Starting flow: #{flow.name} for group: #{group.label}",
        type: :text,
        group_id: group.id
      })

    %{
      group_id: group.id,
      message_id: group_message.id,
      started_at: DateTime.utc_now(),
      user_id: Repo.get_current_user().id,
      organization_id: group.organization_id,
      flow_id: flow.id,
      type: "flow",
      default_results: default_results
    }
    |> init_msg_broadcast(group_message)
  end

  @doc """
  The one simple public interface to broadcast a group
  """
  @spec broadcast_message_to_group(Messages.Message.t(), Group.t(), map(), map()) ::
          {:ok, MessageBroadcast.t()} | {:error, String.t()}
  def broadcast_message_to_group(group_message, group, message_params, default_results \\ %{}) do
    %{
      group_id: group.id,
      message_id: group_message.id,
      started_at: DateTime.utc_now(),
      user_id: Repo.get_current_user().id,
      organization_id: group.organization_id,
      message_params: message_params,
      type: "message",
      default_results: default_results
    }
    |> init_msg_broadcast(group_message)
  end

  @doc """
  The one simple public interface to execute a group broadcast for an organization
  """
  @spec execute_broadcasts(any) :: :ok
  def execute_broadcasts(org_id) do
    # mark all the broadcast as completed if there is no unprocessed contact.
    mark_broadcast_completed(org_id)

    unprocessed_group_broadcast(org_id)
    |> process_broadcast_group()
  end

  @doc """
  We are using this function from the flows.
  """
  @spec broadcast_contacts(
          atom | %{:organization_id => non_neg_integer, optional(any) => any},
          [
            Glific.Contacts.Contact.t()
          ],
          map()
        ) :: :ok
  def broadcast_contacts(flow, contacts, default_results \\ %{}) do
    Repo.put_process_state(flow.organization_id)
    opts = opts(flow.organization_id) |> Keyword.put(:default_results, default_results)

    broadcast_for_contacts(
      %{flow: flow, type: :flow},
      contacts,
      opts
    )
  end

  @doc """
  Start a  group broadcast for a giving broadcast struct
  """
  @spec process_broadcast_group(MessageBroadcast.t() | nil) :: :ok
  def process_broadcast_group(nil), do: :ok

  def process_broadcast_group(%{type: "message"} = message_broadcast) do
    Repo.put_process_state(message_broadcast.organization_id)
    opts = [message_broadcast_id: message_broadcast.id] ++ opts(message_broadcast.organization_id)
    contacts = unprocessed_contacts(message_broadcast)
    message_params = Glific.atomize_keys(message_broadcast.message_params)

    message_params =
      Map.merge(message_params, %{
        :message_broadcast_id => message_broadcast.id,
        :organization_id => message_broadcast.organization_id,
        :group_id => message_broadcast.group_id,
        :publish? => false
      })

    broadcast_for_contacts(%{message_params: message_params, type: :message}, contacts, opts)

    :ok
  end

  def process_broadcast_group(message_broadcast) do
    Repo.put_process_state(message_broadcast.organization_id)

    opts =
      [
        message_broadcast_id: message_broadcast.id,
        default_results: message_broadcast.default_results
      ] ++ opts(message_broadcast.organization_id)

    contacts = unprocessed_contacts(message_broadcast)

    {:ok, flow} =
      Flows.get_cached_flow(
        message_broadcast.organization_id,
        {:flow_id, message_broadcast.flow_id, @status}
      )

    broadcast_for_contacts(%{flow: flow, type: :flow}, contacts, opts)

    :ok
  end

  @doc """
  Mark all the processed  flow broadcast as completed
  """
  @spec mark_broadcast_completed(non_neg_integer()) :: :ok
  def mark_broadcast_completed(org_id) do
    from(fb in MessageBroadcast,
      as: :message_broadcast,
      where: fb.organization_id == ^org_id,
      where: is_nil(fb.completed_at),
      where:
        not exists(
          from(
            fbc in MessageBroadcastContact,
            where:
              parent_as(:message_broadcast).id == fbc.message_broadcast_id and
                is_nil(fbc.processed_at),
            select: 1
          )
        )
    )
    |> Repo.update_all(set: [completed_at: DateTime.utc_now()])

    :ok
  end

  @doc """
  get_broadcast_contact_ids
  """
  @spec get_broadcast_contact_ids(MessageBroadcast.t()) :: list()
  def get_broadcast_contact_ids(message_broadcast) do
    MessageBroadcastContact
    |> where([fbc], fbc.message_broadcast_id == ^message_broadcast.id)
    |> select([fbc], fbc.contact_id)
    |> Repo.all()
  end

  # function to build the opts values to process a list of contacts
  # or a group
  @spec opts(non_neg_integer) :: Keyword.t()
  defp opts(organization_id) do
    organization = Partners.organization(organization_id)

    bsp_limit = organization.services["bsp"].keys["bsp_limit"]
    bsp_limit = if is_nil(bsp_limit), do: 30, else: bsp_limit

    # lets do 80% of organization bsp limit to allow replies to come in and be processed
    bsp_limit = div(bsp_limit * 80, 100)

    [
      bsp_limit: bsp_limit,
      limit: 500,
      offset: 0,
      delay: 0
    ]
  end

  @spec unprocessed_group_broadcast(non_neg_integer) :: MessageBroadcast.t()
  defp unprocessed_group_broadcast(organization_id) do
    from(fb in MessageBroadcast,
      where:
        fb.organization_id == ^organization_id and
          is_nil(fb.completed_at),
      order_by: [desc: fb.inserted_at],
      limit: 1
    )
    |> Repo.one()
    |> Repo.preload([:flow])
  end

  @spec broadcast_per_minute_count() :: integer()
  defp broadcast_per_minute_count do
    default_limit = 100

    Application.fetch_env!(:glific, :broadcast_contact_count)
    |> Glific.parse_maybe_integer()
    |> case do
      {:ok, nil} -> default_limit
      {:ok, count} -> count
      _ -> default_limit
    end
  end

  defp unprocessed_contacts(message_broadcast) do
    contact_limit = broadcast_per_minute_count()

    broadcast_contacts_query(message_broadcast)
    |> limit(^contact_limit)
    |> order_by([c, _fbc], asc: c.id)
    |> Repo.all()
  end

  defp broadcast_contacts_query(message_broadcast) do
    Contact
    |> join(:inner, [c], fbc in MessageBroadcastContact,
      as: :fbc,
      on: fbc.contact_id == c.id and fbc.message_broadcast_id == ^message_broadcast.id
    )
    |> where([_c, fbc], is_nil(fbc.processed_at))
  end

  # """
  # Lets start a bunch of contacts on a flow in parallel
  # """
  @spec broadcast_for_contacts(map(), list(Contact.t()), Keyword.t()) :: :ok
  defp broadcast_for_contacts(attrs, contacts, opts) do
    contacts
    |> Enum.chunk_every(opts[:bsp_limit])
    |> Enum.with_index()
    |> Enum.each(fn {chunk_list, delay_offset} ->
      task_opts = [
        {:delay, opts[:delay] + delay_offset},
        {:message_broadcast_id, opts[:message_broadcast_id]},
        {:default_results, opts[:default_results]}
      ]

      if attrs.type == :flow,
        do: flow_tasks(attrs.flow, chunk_list, task_opts),
        else: message_tasks(attrs.message_params, chunk_list, task_opts)
    end)

    :ok
  end

  @spec flow_tasks(Flow.t(), Contact.t(), Keyword.t()) :: :ok
  defp flow_tasks(flow, contacts, opts) do
    stream =
      Task.Supervisor.async_stream_nolink(
        Glific.Broadcast.Supervisor,
        contacts,
        fn contact ->
          Repo.put_process_state(contact.organization_id)

          Keyword.get(opts, :message_broadcast_id, nil)
          |> mark_message_broadcast_contact_processed(contact.id, "pending")

          response = FlowContext.init_context(flow, contact, @status, opts)

          if elem(response, 0) in [:ok, :wait] do
            Keyword.get(opts, :message_broadcast_id, nil)
            |> mark_message_broadcast_contact_processed(contact.id, "processed")
          else
            Logger.info("Could not start the flow for the contact.
               Contact id : #{contact.id} opts: #{inspect(opts)}
               response #{inspect(response)}")
          end

          :ok
        end,
        ordered: false,
        timeout: 5_000,
        on_timeout: :kill_task
      )

    Stream.run(stream)
  end

  @spec message_tasks(map(), Contact.t(), Keyword.t()) :: :ok
  defp message_tasks(message_params, contacts, opts) do
    stream =
      Task.Supervisor.async_stream_nolink(
        Glific.Broadcast.Supervisor,
        contacts,
        fn contact ->
          Repo.put_process_state(contact.organization_id)

          Keyword.get(opts, :message_broadcast_id, nil)
          |> mark_message_broadcast_contact_processed(contact.id, "pending")

          message_params = Map.put(message_params, :receiver_id, contact.id)

          result =
            if message_params[:is_hsm] in [nil, false],
              do: Messages.create_and_send_message(message_params),
              else: Messages.create_and_send_hsm_message(message_params)

          case result do
            {:ok, _message} ->
              Keyword.get(opts, :message_broadcast_id, nil)
              |> mark_message_broadcast_contact_processed(contact.id, "processed")

            {:error, error} ->
              Logger.info("Could not start the message for the contact.
              Contact id : #{contact.id} opts: #{inspect(opts)}
              error #{inspect(error)}")
          end

          :ok
        end,
        ordered: false,
        timeout: 5_000,
        on_timeout: :kill_task
      )

    Stream.run(stream)
  end

  @spec init_msg_broadcast(map(), Messages.Message.t()) ::
          {:ok, MessageBroadcast.t()} | {:error, String.t()}
  defp init_msg_broadcast(broadcast_attrs, group_message) do
    {:ok, message_broadcast} =
      broadcast_attrs
      |> create_message_broadcast()

    {:ok, _} =
      group_message
      |> Messages.update_message(%{message_broadcast_id: message_broadcast.id})

    populate_message_broadcast_contacts(message_broadcast)
    |> case do
      {:ok, _} -> {:ok, message_broadcast}
      _ -> {:error, "could not initiate broadcast"}
    end
  end

  @spec mark_message_broadcast_contact_processed(integer() | nil, integer(), String.t()) :: :ok
  defp mark_message_broadcast_contact_processed(nil, _, _status), do: :ok

  defp mark_message_broadcast_contact_processed(message_broadcast_id, contact_id, status) do
    MessageBroadcastContact
    |> where(message_broadcast_id: ^message_broadcast_id, contact_id: ^contact_id)
    |> Repo.update_all(set: [processed_at: DateTime.utc_now(), status: status])
  end

  @spec create_message_broadcast(map()) ::
          {:ok, MessageBroadcast.t()} | {:error, Ecto.Changeset.t()}
  defp create_message_broadcast(attrs) do
    %MessageBroadcast{}
    |> MessageBroadcast.changeset(attrs)
    |> Repo.insert()
  end

  @spec populate_message_broadcast_contacts(MessageBroadcast.t()) ::
          {:ok, any()} | {:error, any()}
  defp populate_message_broadcast_contacts(message_broadcast) do
    """
    INSERT INTO message_broadcast_contacts
    (message_broadcast_id, status, organization_id, inserted_at, updated_at, contact_id)

    (SELECT #{message_broadcast.id}, 'pending', #{message_broadcast.organization_id}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, contact_id
      FROM contacts_groups left join contacts on contacts.id = contacts_groups.contact_id
      WHERE group_id = #{message_broadcast.group_id} AND (status !=  'blocked') AND (contacts.optout_time is null))
    """
    |> Repo.query()
  end

  @spec broadcast_stats_base_query(non_neg_integer()) :: String.t()
  defp broadcast_stats_base_query(message_broadcast_id) do
    """
    SELECT distinct on (message_broadcast_contacts.contact_id)
    messages.id as message_id,
    messages.status,
    message_broadcast_contacts.processed_at,
    message_broadcast_contacts.status as message_broadcast_status,
    messages.bsp_status,
    messages.errors
    FROM message_broadcast_contacts
    left JOIN messages ON messages.message_broadcast_id = message_broadcast_contacts.message_broadcast_id
    AND messages.contact_id = message_broadcast_contacts.contact_id
    WHERE message_broadcast_contacts.message_broadcast_id = #{message_broadcast_id};
    """
  end

  @doc """
  Get broadcast stats for a flow
  """
  @spec broadcast_stats(non_neg_integer()) :: {:ok, map()}
  def broadcast_stats(message_broadcast_id) do
    results =
      %{
        success: 0,
        failed: 0,
        pending: 0,
        msg_categories: %{
          sent: 0,
          read: 0,
          delivered: 0,
          enqueued: 0,
          opted_out: 0,
          error: 0
        }
      }
      |> count_successful_deliveries(message_broadcast_id)
      |> count_failed_deliveries(message_broadcast_id)
      |> count_pending_deliveries(message_broadcast_id)
      |> count_deliveries_by_category(message_broadcast_id)

    {:ok, results}
  end

  @spec count_successful_deliveries(map(), non_neg_integer()) :: map()
  defp count_successful_deliveries(map, message_broadcast_id) do
    count =
      MessageBroadcastContact
      |> where([fbc], fbc.message_broadcast_id == ^message_broadcast_id)
      |> where([fbc], not is_nil(fbc.processed_at))
      |> where([fbc], fbc.status == "processed")
      |> Repo.aggregate(:count)

    Map.put_new(map, :success, count)
  end

  @spec count_failed_deliveries(map(), non_neg_integer()) :: map()
  defp count_failed_deliveries(map, message_broadcast_id) do
    count =
      MessageBroadcastContact
      |> where([fbc], fbc.message_broadcast_id == ^message_broadcast_id)
      |> where([fbc], not is_nil(fbc.processed_at))
      |> where([fbc], fbc.status == "pending")
      |> Repo.aggregate(:count)

    Map.put_new(map, :failed, count)
  end

  @spec count_pending_deliveries(map(), non_neg_integer()) :: map()
  defp count_pending_deliveries(map, message_broadcast_id) do
    count =
      MessageBroadcastContact
      |> where([fbc], fbc.message_broadcast_id == ^message_broadcast_id)
      |> where([fbc], is_nil(fbc.processed_at))
      |> Repo.aggregate(:count)

    Map.put_new(map, :failed, count)
  end

  @spec count_deliveries_by_category(map(), non_neg_integer()) :: map()
  defp count_deliveries_by_category(map, message_broadcast_id) do
    Map.put(map, :msg_categories, msg_deliveries_by_category(message_broadcast_id))
  end

  @spec msg_deliveries_by_category(non_neg_integer()) :: map()
  defp msg_deliveries_by_category(message_broadcast_id) do
    data =
      broadcast_stats_base_query(message_broadcast_id)
      |> Repo.query!()

    sent_count =
      Enum.count(data.rows, fn d ->
        [_message_id, message_status, _processed_at, _broadcast_status, _bsp_status, _errors] = d
        message_status == "sent"
      end)

    read_count =
      Enum.count(data.rows, fn d ->
        [_message_id, _message_status, _processed_at, _broadcast_status, bsp_status, _errors] = d
        bsp_status == "read"
      end)

    delivered_count =
      Enum.count(data.rows, fn d ->
        [_message_id, _message_status, _processed_at, _broadcast_status, bsp_status, _errors] = d
        bsp_status == "delivered"
      end)

    enqueued_count =
      Enum.count(data.rows, fn d ->
        [_message_id, _message_status, _processed_at, _broadcast_status, bsp_status, _errors] = d
        bsp_status == "enqueued"
      end)

    error_count =
      Enum.count(data.rows, fn d ->
        [_message_id, _message_status, _processed_at, _broadcast_status, bsp_status, _errors] = d
        bsp_status == "error"
      end)

    %{
      sent: sent_count,
      read: read_count,
      delivered: delivered_count,
      enqueued: enqueued_count,
      error: error_count
    }
  end
end