Skip to main content

lib/jido/chat/channel_ref.ex

defmodule Jido.Chat.ChannelRef do
  @moduledoc """
  Channel handle for adapter-routed posting, state, and metadata access.
  """

  alias Jido.Chat.{
    Adapter,
    Attachment,
    ChannelInfo,
    FileUpload,
    MessagePage,
    Modal,
    ModalResult,
    PostPayload,
    Postable,
    SentMessage,
    ThreadPage,
    Thread,
    Wire
  }

  @schema Zoi.struct(
            __MODULE__,
            %{
              id: Zoi.string(),
              adapter_name: Zoi.atom(),
              adapter: Zoi.any(),
              external_id: Zoi.any(),
              metadata: Zoi.map() |> Zoi.default(%{})
            },
            coerce: true
          )

  @type t :: unquote(Zoi.type_spec(@schema))

  @enforce_keys Zoi.Struct.enforce_keys(@schema)
  defstruct Zoi.Struct.struct_fields(@schema)

  @doc "Returns the Zoi schema for ChannelRef."
  def schema, do: @schema

  @doc "Creates a channel reference."
  def new(attrs) when is_map(attrs), do: Jido.Chat.Schema.parse!(__MODULE__, @schema, attrs)

  @doc "Posts string/postable/stream content to channel via adapter."
  @spec post(t(), String.t() | Postable.t() | map() | Enumerable.t(), keyword()) ::
          {:ok, SentMessage.t()} | {:error, term()}
  def post(channel, input, opts \\ [])

  def post(%__MODULE__{} = channel, text, opts) when is_binary(text),
    do: text |> PostPayload.text() |> then(&dispatch_post_payload(channel, &1, opts))

  def post(%__MODULE__{} = channel, %Postable{} = postable, opts),
    do: postable |> Postable.to_payload() |> then(&dispatch_post_payload(channel, &1, opts))

  def post(%__MODULE__{} = channel, postable_map, opts) when is_map(postable_map) do
    postable_map
    |> Postable.new()
    |> Postable.to_payload()
    |> then(&dispatch_post_payload(channel, &1, opts))
  rescue
    _ -> {:error, :invalid_postable}
  end

  def post(%__MODULE__{} = channel, enumerable, opts) do
    if Enumerable.impl_for(enumerable) do
      post_stream(channel, enumerable, opts)
    else
      {:error, :invalid_postable}
    end
  end

  @doc "Uploads a file to the channel when supported by the adapter."
  @spec send_file(t(), FileUpload.input(), keyword()) ::
          {:ok, SentMessage.t()} | {:error, term()}
  def send_file(%__MODULE__{} = channel, file, opts \\ []) do
    with {:ok, response} <- Adapter.send_file(channel.adapter, channel.external_id, file, opts) do
      {:ok,
       SentMessage.new(%{
         id: response.external_message_id || Jido.Chat.ID.generate!(),
         thread_id: channel.id,
         adapter: channel.adapter,
         external_room_id: channel.external_id,
         text: opts[:text] || opts[:caption],
         formatted: opts[:text] || opts[:caption],
         raw: response.raw,
         attachments: [Attachment.normalize(file)],
         metadata: opts[:metadata] || %{},
         response: response,
         default_opts: opts
       })}
    end
  end

  @doc "Opens a modal in the channel when supported by the adapter."
  @spec open_modal(t(), Modal.t() | map(), keyword()) :: {:ok, ModalResult.t()} | {:error, term()}
  def open_modal(%__MODULE__{} = channel, payload, opts \\ [])
      when is_map(payload) or is_struct(payload, Modal) do
    Adapter.open_modal(channel.adapter, channel.external_id, payload, opts)
  end

  @doc "Opens a native platform thread from an existing channel message when supported."
  @spec open_thread(t(), String.t() | integer(), keyword()) ::
          {:ok, Thread.t()} | {:error, term()}
  def open_thread(%__MODULE__{} = channel, message_id, opts \\ []) do
    Adapter.open_thread(channel.adapter, channel.external_id, message_id, opts)
  end

  @doc "Posts an ephemeral message via adapter when supported."
  @spec post_ephemeral(t(), String.t() | integer(), String.t() | Postable.t() | map(), keyword()) ::
          {:ok, Jido.Chat.EphemeralMessage.t()} | {:error, term()}
  def post_ephemeral(channel, user_id, input, opts \\ [])

  def post_ephemeral(%__MODULE__{} = channel, user_id, text, opts) when is_binary(text) do
    Adapter.post_ephemeral_message(
      channel.adapter,
      channel.external_id,
      user_id,
      PostPayload.text(text),
      opts
    )
  end

  def post_ephemeral(%__MODULE__{} = channel, user_id, %Postable{} = postable, opts) do
    Adapter.post_ephemeral_message(
      channel.adapter,
      channel.external_id,
      user_id,
      Postable.to_payload(postable),
      opts
    )
  end

  def post_ephemeral(%__MODULE__{} = channel, user_id, postable_map, opts)
      when is_map(postable_map) do
    Adapter.post_ephemeral_message(
      channel.adapter,
      channel.external_id,
      user_id,
      postable_map |> Postable.new() |> Postable.to_payload(),
      opts
    )
  rescue
    _ -> {:error, :invalid_postable}
  end

  @doc "Starts typing indicator on channel when supported."
  @spec start_typing(t(), String.t() | nil) :: :ok | {:error, term()}
  def start_typing(%__MODULE__{} = channel, status \\ nil) do
    opts = if is_binary(status), do: [status: status], else: []
    Adapter.start_typing(channel.adapter, channel.external_id, opts)
  end

  @doc "Renders adapter-specific mention format for a user id."
  @spec mention_user(t(), String.t() | integer()) :: String.t()
  def mention_user(%__MODULE__{adapter_name: :discord}, user_id), do: "<@#{user_id}>"
  def mention_user(%__MODULE__{adapter_name: :telegram}, user_id), do: "@#{user_id}"
  def mention_user(%__MODULE__{}, user_id), do: "@#{user_id}"

  @doc "Gets channel state map or a single key from chat struct state."
  @spec state(Jido.Chat.t(), t(), term() | nil) :: map() | term()
  def state(%Jido.Chat{} = chat, %__MODULE__{} = channel, key \\ nil) do
    channel_state = Jido.Chat.channel_state(chat, channel.id)
    if is_nil(key), do: channel_state, else: Map.get(channel_state, key)
  end

  @doc "Sets channel state in chat struct using :replace, :merge, or key/value modes."
  @spec set_state(Jido.Chat.t(), t(), atom() | term(), map() | term()) :: Jido.Chat.t()
  def set_state(%Jido.Chat{} = chat, %__MODULE__{} = channel, :replace, %{} = value) do
    Jido.Chat.put_channel_state(chat, channel.id, value)
  end

  def set_state(%Jido.Chat{} = chat, %__MODULE__{} = channel, :merge, %{} = value) do
    merged = Map.merge(Jido.Chat.channel_state(chat, channel.id), value)
    Jido.Chat.put_channel_state(chat, channel.id, merged)
  end

  def set_state(%Jido.Chat{} = chat, %__MODULE__{} = channel, key, value) do
    next_state = Map.put(Jido.Chat.channel_state(chat, channel.id), key, value)
    Jido.Chat.put_channel_state(chat, channel.id, next_state)
  end

  @doc "Returns cached channel name from metadata when present."
  @spec name(t()) :: String.t() | nil
  def name(%__MODULE__{} = channel) do
    channel.metadata[:name] || channel.metadata["name"] || channel.metadata[:title] ||
      channel.metadata["title"]
  end

  @doc "Fetches channel metadata as `Jido.Chat.ChannelInfo`."
  @spec fetch_metadata(t(), keyword()) :: {:ok, ChannelInfo.t()} | {:error, term()}
  def fetch_metadata(%__MODULE__{} = channel, opts \\ []) do
    Adapter.fetch_metadata(channel.adapter, channel.external_id, opts)
  end

  @doc "Fetches a page of channel-level messages when supported."
  @spec messages(t(), keyword() | map() | Jido.Chat.FetchOptions.t()) ::
          {:ok, MessagePage.t()} | {:error, term()}
  def messages(%__MODULE__{} = channel, opts \\ []) do
    opts = normalize_fetch_opts(opts)
    Adapter.fetch_channel_messages(channel.adapter, channel.external_id, opts)
  end

  @doc "Lists thread summaries in this channel when supported."
  @spec threads(t(), keyword()) :: {:ok, ThreadPage.t()} | {:error, term()}
  def threads(%__MODULE__{} = channel, opts \\ []) do
    Adapter.list_threads(channel.adapter, channel.external_id, opts)
  end

  @doc "Returns a lazy stream over channel messages using cursor pagination."
  @spec messages_stream(t(), keyword() | map() | Jido.Chat.FetchOptions.t()) :: Enumerable.t()
  def messages_stream(%__MODULE__{} = channel, opts \\ []) do
    base_opts = normalize_fetch_opts(opts)

    Stream.resource(
      fn -> %{channel: channel, opts: base_opts, cursor: nil, pending: [], done?: false} end,
      &next_message_batch/1,
      fn _state -> :ok end
    )
  end

  @doc "Returns a lazy stream over channel thread summaries using cursor pagination."
  @spec threads_stream(t(), keyword()) :: Enumerable.t()
  def threads_stream(%__MODULE__{} = channel, opts \\ []) do
    Stream.resource(
      fn -> %{channel: channel, opts: opts, cursor: nil, pending: [], done?: false} end,
      &next_thread_batch/1,
      fn _state -> :ok end
    )
  end

  @doc "Serializes channel ref into a plain map with type marker."
  @spec to_map(t()) :: map()
  def to_map(%__MODULE__{} = channel) do
    channel
    |> Map.from_struct()
    |> Map.update!(:adapter, &Wire.encode_module/1)
    |> Wire.to_plain()
    |> Map.put("__type__", "channel")
  end

  @doc "Builds a channel ref from serialized map data."
  @spec from_map(map()) :: t()
  def from_map(map) when is_map(map) do
    adapter = map[:adapter] || map["adapter"]

    map
    |> Map.drop(["__type__", :__type__])
    |> Map.delete("adapter")
    |> Map.put(:adapter, Wire.decode_module(adapter))
    |> new()
  end

  defp post_payload(%__MODULE__{} = channel, %PostPayload{} = payload, opts) do
    adapter_opts = Keyword.put(opts, :scope, :channel)

    with {:ok, default_opts} <- post_default_opts(channel.adapter, payload, opts),
         {:ok, response} <-
           Adapter.post_message(channel.adapter, channel.external_id, payload, adapter_opts) do
      {:ok,
       SentMessage.new(%{
         id: response.external_message_id || Jido.Chat.ID.generate!(),
         thread_id: channel.id,
         adapter: channel.adapter,
         external_room_id: channel.external_id,
         text: PostPayload.display_text(payload),
         formatted: payload.formatted || PostPayload.display_text(payload),
         raw: payload.raw,
         attachments: PostPayload.outbound_attachments(payload),
         metadata: payload.metadata,
         response: response,
         default_opts: default_opts
       })}
    end
  end

  defp dispatch_post_payload(%__MODULE__{} = channel, %PostPayload{kind: :stream} = payload, opts)
       when not is_nil(payload.stream) do
    post_stream(channel, payload.stream, opts)
  end

  defp dispatch_post_payload(%__MODULE__{} = channel, %PostPayload{} = payload, opts) do
    post_payload(channel, payload, opts)
  end

  defp post_stream(%__MODULE__{} = channel, enumerable, opts) do
    with {:ok, response} <-
           Adapter.stream(channel.adapter, channel.external_id, enumerable, opts) do
      {:ok,
       SentMessage.new(%{
         id: response.external_message_id || Jido.Chat.ID.generate!(),
         thread_id: channel.id,
         adapter: channel.adapter,
         external_room_id: channel.external_id,
         raw: response.raw,
         metadata: %{stream: true},
         response: response,
         default_opts: opts
       })}
    end
  end

  defp normalize_fetch_opts(%Jido.Chat.FetchOptions{} = opts),
    do: Jido.Chat.FetchOptions.to_keyword(opts)

  defp normalize_fetch_opts(opts) when is_map(opts),
    do: opts |> Jido.Chat.FetchOptions.new() |> Jido.Chat.FetchOptions.to_keyword()

  defp normalize_fetch_opts(opts) when is_list(opts),
    do: opts |> Jido.Chat.FetchOptions.new() |> Jido.Chat.FetchOptions.to_keyword()

  defp normalize_fetch_opts(_other),
    do: Jido.Chat.FetchOptions.to_keyword(Jido.Chat.FetchOptions.new(%{}))

  defp maybe_put_caption(opts, %PostPayload{} = payload) do
    case PostPayload.display_text(payload) do
      nil ->
        opts

      "" ->
        opts

      text ->
        opts
        |> Keyword.put_new(:caption, text)
        |> Keyword.put_new(:text, text)
    end
  end

  defp maybe_put_metadata(opts, metadata) when metadata in [%{}, nil], do: opts

  defp maybe_put_metadata(opts, metadata) when is_map(metadata) do
    Keyword.update(opts, :metadata, metadata, &Map.merge(metadata, &1))
  end

  defp post_default_opts(adapter, %PostPayload{} = payload, opts) do
    upload_candidates = PostPayload.upload_candidates(payload)

    cond do
      function_exported?(adapter, :post_message, 3) ->
        {:ok, opts}

      upload_candidates in [nil, []] ->
        {:ok, opts}

      match?([_attachment], upload_candidates) ->
        {:ok,
         opts
         |> maybe_put_caption(payload)
         |> maybe_put_metadata(payload.metadata)}

      true ->
        {:error, :multiple_attachments_unsupported}
    end
  end

  defp next_message_batch(%{pending: [next | rest]} = state),
    do: {[next], %{state | pending: rest}}

  defp next_message_batch(%{done?: true} = state), do: {:halt, state}

  defp next_message_batch(%{channel: channel, opts: opts, cursor: cursor} = state) do
    request_opts =
      case cursor do
        nil -> opts
        next_cursor -> Keyword.put(opts, :cursor, next_cursor)
      end

    case messages(channel, request_opts) do
      {:ok, %MessagePage{} = page} ->
        pending = page.messages
        done? = is_nil(page.next_cursor)
        next_cursor = page.next_cursor

        case pending do
          [] ->
            if is_nil(next_cursor) do
              {:halt, %{state | done?: true}}
            else
              next_message_batch(%{state | pending: [], cursor: next_cursor, done?: done?})
            end

          [first | rest] ->
            {[first], %{state | pending: rest, cursor: next_cursor, done?: done?}}
        end

      {:error, _reason} ->
        {:halt, %{state | done?: true}}
    end
  end

  defp next_thread_batch(%{pending: [next | rest]} = state),
    do: {[next], %{state | pending: rest}}

  defp next_thread_batch(%{done?: true} = state), do: {:halt, state}

  defp next_thread_batch(%{channel: channel, opts: opts, cursor: cursor} = state) do
    request_opts =
      case cursor do
        nil -> opts
        next_cursor -> Keyword.put(opts, :cursor, next_cursor)
      end

    case threads(channel, request_opts) do
      {:ok, %ThreadPage{} = page} ->
        pending = page.threads
        done? = is_nil(page.next_cursor)
        next_cursor = page.next_cursor

        case pending do
          [] ->
            if is_nil(next_cursor) do
              {:halt, %{state | done?: true}}
            else
              next_thread_batch(%{state | pending: [], cursor: next_cursor, done?: done?})
            end

          [first | rest] ->
            {[first], %{state | pending: rest, cursor: next_cursor, done?: done?}}
        end

      {:error, _reason} ->
        {:halt, %{state | done?: true}}
    end
  end
end