Skip to main content

lib/jido/chat/x/adapter.ex

defmodule Jido.Chat.X.Adapter do
  @moduledoc "X/Twitter Direct Messages `Jido.Chat.Adapter` implementation."
  use Jido.Chat.Adapter

  alias Jido.Chat.{
    Attachment,
    Author,
    EventEnvelope,
    FileUpload,
    ID,
    Incoming,
    Media,
    Message,
    MessagePage,
    PostPayload,
    Response,
    WebhookRequest,
    WebhookResponse
  }

  alias Jido.Chat.X.Transport.XdkClient

  @impl true
  def channel_type, do: :x

  @impl true
  def capabilities do
    %{
      send_message: :native,
      delete_message: :native,
      fetch_messages: :native,
      webhook: :native,
      verify_webhook: :native,
      parse_event: :native,
      format_webhook_response: :native,
      post_message: :native,
      send_file: :native,
      edit_message: :unsupported,
      start_typing: :unsupported,
      post_ephemeral: :unsupported,
      open_modal: :unsupported,
      fetch_message: :native,
      open_dm: :native,
      markdown: :fallback,
      multi_file: :native
    }
  end

  @impl true
  def transform_incoming(%{"dm_event" => event} = payload),
    do: {:ok, incoming_from_dm(event, payload)}

  def transform_incoming(%{"type" => "MessageCreate"} = event),
    do: {:ok, incoming_from_dm(event, event)}

  def transform_incoming(%{"type" => "message_create"} = event),
    do: {:ok, incoming_from_dm(event, event)}

  def transform_incoming(%{"event_type" => "MessageCreate"} = event),
    do: {:ok, incoming_from_dm(event, event)}

  def transform_incoming(%{"event_type" => "message_create"} = event),
    do: {:ok, incoming_from_dm(event, event)}

  def transform_incoming(_), do: {:error, :unsupported_payload}

  @impl true
  def send_message(room_id, text, opts \\ []) do
    with {:ok, raw} <- send_text(room_id, text, opts) do
      {:ok, response_from_send(raw, room_id)}
    end
  end

  @impl true
  def post_message(room_id, payload, opts \\ [])

  def post_message(room_id, %PostPayload{} = payload, opts) do
    with {:ok, text} <- render_post_text(payload, opts),
         {:ok, raw} <- send_text(room_id, text, opts) do
      {:ok,
       raw
       |> response_from_send(room_id)
       |> put_response_metadata(:attachments, PostPayload.outbound_attachments(payload))}
    end
  end

  def post_message(room_id, payload, opts) when is_map(payload) do
    post_message(room_id, PostPayload.new(payload), opts)
  end

  @impl true
  def send_file(room_id, file, opts \\ []) do
    upload = FileUpload.normalize(file)
    caption = Keyword.get(opts, :caption) || Keyword.get(opts, :text)

    payload =
      PostPayload.new(%{
        kind: :text,
        text: caption,
        files: [upload],
        metadata: Keyword.get(opts, :metadata, %{})
      })

    post_message(room_id, payload, Keyword.drop(opts, [:caption, :text]))
  end

  @impl true
  def delete_message(_room_id, event_id, opts \\ []) do
    transport(opts).delete_dm_event(to_string(event_id), opts)
  end

  @impl true
  def fetch_message(room_id, event_id, opts \\ []) do
    with {:ok, raw} <- transport(opts).fetch_dm_event(to_string(event_id), opts) do
      event = raw["data"] || raw[:data] || raw
      {:ok, message_from_event(event, room_id, raw)}
    end
  end

  @impl true
  def fetch_messages(room_id, opts \\ []) do
    conversation_id = String.replace_prefix(to_string(room_id), "conversation:", "")

    with {:ok, raw} <- transport(opts).fetch_conversation_messages(conversation_id, opts) do
      events = raw["data"] || raw[:data] || raw

      messages =
        events
        |> List.wrap()
        |> Enum.map(&message_from_event(&1, room_id, raw))

      {:ok, MessagePage.new(%{messages: messages, metadata: %{"raw" => raw}})}
    end
  end

  @impl true
  def open_dm(user_id, _opts \\ []), do: {:ok, to_string(user_id)}

  @impl true
  def handle_webhook(%Jido.Chat{} = chat, payload, opts \\ []) when is_map(payload) do
    request =
      WebhookRequest.new(%{
        adapter_name: :x,
        headers: opts[:headers] || %{},
        method: opts[:method] || "POST",
        query: opts[:query] || %{},
        payload: payload,
        raw: opts[:raw_body] || opts[:raw] || payload,
        metadata: %{raw_body: opts[:raw_body] || opts[:raw]}
      })

    with :ok <- verify_webhook(request, opts),
         {:ok, parsed_event} <- parse_event(request, opts) do
      route_parsed_event(chat, parsed_event, opts, request)
    end
  end

  @impl true
  def verify_webhook(request, opts \\ [])

  def verify_webhook(%WebhookRequest{method: method} = request, opts) do
    secret = consumer_secret(opts)

    cond do
      secret in [nil, ""] ->
        {:error, :missing_consumer_secret}

      get_request?(method) and crc_token(request) not in [nil, ""] ->
        :ok

      get_request?(method) ->
        {:error, :missing_crc_token}

      valid_signature?(request, secret) ->
        :ok

      true ->
        {:error, :invalid_signature}
    end
  end

  def verify_webhook(request, opts) when is_map(request) do
    request
    |> WebhookRequest.new()
    |> verify_webhook(opts)
  end

  @impl true
  def parse_event(request, opts \\ [])

  def parse_event(%WebhookRequest{method: method} = request, _opts) do
    case {get_request?(method), dm_events(request.payload)} do
      {true, _events} -> {:ok, :noop}
      {false, [event | _rest]} -> event_envelope(event, request)
      {false, []} -> {:ok, :noop}
    end
  end

  def parse_event(request, opts) when is_map(request) do
    request
    |> WebhookRequest.new()
    |> parse_event(opts)
  end

  @impl true
  def format_webhook_response(%WebhookRequest{method: method} = request, opts)
      when is_binary(method) do
    format_crc_response(request, consumer_secret(opts), get_request?(method))
  end

  def format_webhook_response({:ok, _chat, _incoming}, _opts), do: WebhookResponse.accepted()
  def format_webhook_response({:ok, :noop}, _opts), do: WebhookResponse.accepted()

  def format_webhook_response({:error, reason}, _opts),
    do: WebhookResponse.error(400, inspect(reason))

  def format_webhook_response(_, _opts), do: WebhookResponse.accepted()

  defp send_text(room_id, text, opts) do
    if conversation_room_id?(room_id) do
      transport(opts).send_conversation_message(conversation_id(room_id), to_string(text), opts)
    else
      transport(opts).send_dm(to_string(room_id), to_string(text), opts)
    end
  end

  defp render_post_text(%PostPayload{} = payload, opts) do
    base =
      payload.markdown || payload.formatted || PostPayload.display_text(payload) ||
        payload.fallback_text

    with {:ok, attachment_lines} <- attachment_lines(PostPayload.outbound_attachments(payload)) do
      [reply_context(opts), blank_to_nil(base) | attachment_lines]
      |> render_text_sections()
    end
  end

  defp attachment_lines(attachments) when is_list(attachments) do
    Enum.reduce_while(attachments, {:ok, []}, fn attachment, {:ok, acc} ->
      case attachment_line(Attachment.normalize(attachment)) do
        {:ok, line} -> {:cont, {:ok, [line | acc]}}
        {:error, _reason} = error -> {:halt, error}
      end
    end)
    |> case do
      {:ok, lines} -> {:ok, Enum.reverse(lines)}
      {:error, _reason} = error -> error
    end
  end

  defp attachment_line(%Attachment{url: url} = attachment) when is_binary(url) and url != "" do
    label = attachment.filename || filename_from_url(url) || "attachment"
    {:ok, "#{label}: #{url}"}
  end

  defp attachment_line(%Attachment{}),
    do: {:error, {:unsupported_file_upload, :x_requires_media_upload_or_remote_url}}

  defp reply_context(opts) do
    case Keyword.get(opts, :reply_to_id) || Keyword.get(opts, :quote_id) do
      message_id when message_id in [nil, ""] -> nil
      message_id -> "Replying to #{message_id}:"
    end
  end

  defp render_text_sections(sections) do
    body =
      sections
      |> Enum.reject(&is_nil/1)
      |> Enum.join("\n\n")
      |> String.trim()

    if body == "", do: {:error, :empty_message}, else: {:ok, body}
  end

  defp route_parsed_event(chat, :noop, _opts, %WebhookRequest{} = request) do
    {:ok, chat, synthetic_incoming(request, :noop)}
  end

  defp route_parsed_event(chat, %EventEnvelope{} = envelope, opts, _request) do
    with {:ok, updated_chat, routed_envelope} <- Jido.Chat.process_event(chat, :x, envelope, opts),
         %EventEnvelope{payload: %Incoming{} = incoming} <- routed_envelope do
      {:ok, updated_chat, incoming}
    else
      %EventEnvelope{} -> {:error, :unsupported_event_type}
      {:error, _reason} = error -> error
    end
  end

  defp synthetic_incoming(%WebhookRequest{} = request, event_type) do
    Incoming.new(%{
      external_room_id: "x",
      external_user_id: nil,
      external_message_id: WebhookRequest.header(request, "x-twitter-webhooks-delivery"),
      text: nil,
      raw: request.payload,
      metadata: %{event_type: event_type}
    })
  end

  defp event_envelope(event, %WebhookRequest{} = request) do
    with {:ok, incoming} <-
           transform_incoming(%{"dm_event" => event, "payload" => request.payload}) do
      {:ok,
       EventEnvelope.new(%{
         adapter_name: :x,
         event_type: :message,
         thread_id: thread_id(incoming.external_room_id),
         channel_id: to_string(incoming.external_room_id),
         message_id: to_string(incoming.external_message_id),
         payload: incoming,
         raw: request.payload,
         metadata: %{"for_user_id" => request.payload["for_user_id"]}
       })}
    end
  end

  defp incoming_from_dm(event, payload) do
    context = dm_context(event)
    source_payload = payload["payload"] || payload

    Incoming.new(%{
      external_room_id: "conversation:#{context.conversation_id || context.sender_id}",
      external_thread_id: context.conversation_id && to_string(context.conversation_id),
      external_message_id: context.id && to_string(context.id),
      external_user_id: context.sender_id && to_string(context.sender_id),
      text: context.text,
      timestamp: context.timestamp,
      author: author(context.sender_id),
      chat_type: :direct_message,
      raw: payload,
      media: media_from_event(event, source_payload),
      metadata: %{"conversation_id" => context.conversation_id}
    })
  end

  defp message_from_event(event, room_id, payload) do
    context = dm_context(event)

    Message.new(%{
      id: to_string(context.id || ID.generate!()),
      thread_id: thread_id(room_id),
      channel_id: to_string(room_id),
      text: context.text,
      raw: event,
      author: author(context.sender_id),
      attachments: media_from_event(event, payload),
      created_at: context.timestamp,
      external_message_id: context.id && to_string(context.id),
      external_room_id: room_id
    })
  end

  defp dm_context(event) do
    message_data = get_in(event, ["message_create", "message_data"]) || %{}

    %{
      id: event["id"] || event["dm_event_id"],
      conversation_id:
        event["dm_conversation_id"] || event["conversation_id"] ||
          get_in(event, ["dm_conversation", "id"]),
      sender_id: event["sender_id"] || get_in(event, ["message_create", "sender_id"]),
      text: event["text"] || message_data["text"] || "",
      timestamp: event["created_at"] || event["created_timestamp"]
    }
  end

  defp response_from_send(raw, room_id) do
    data = raw["data"] || raw[:data] || raw

    Response.new(%{
      external_message_id: data["dm_event_id"] || data[:dm_event_id] || data["id"],
      external_room_id:
        "conversation:#{data["dm_conversation_id"] || data[:dm_conversation_id] || room_id}",
      channel_type: :x,
      raw: raw
    })
  end

  defp put_response_metadata(%Response{} = response, key, value) do
    %{response | metadata: Map.put(response.metadata || %{}, key, value)}
  end

  defp dm_events(payload) do
    payload["direct_message_events"] || payload["dm_events"] || payload["data"] || []
  end

  defp media_from_event(event, payload) do
    direct_media(event) ++ included_media(event, payload)
  end

  defp direct_media(event) do
    event
    |> get_in(["message_create", "message_data", "attachment", "media"])
    |> case do
      %{} = media -> [media_from_x_media(media)]
      _ -> []
    end
  end

  defp included_media(event, payload) do
    media_by_key =
      payload
      |> get_in(["includes", "media"])
      |> List.wrap()
      |> Enum.filter(&is_map/1)
      |> Map.new(fn media -> {media["media_key"] || media["id"], media} end)

    event
    |> get_in(["attachments", "media_keys"])
    |> List.wrap()
    |> Enum.flat_map(fn key ->
      case Map.get(media_by_key, key) do
        %{} = media -> [media_from_x_media(media)]
        _ -> []
      end
    end)
  end

  defp media_from_x_media(media) do
    Media.new(%{
      kind: media_kind(media["type"] || media["media_type"]),
      url: media["url"] || media["media_url_https"] || media["preview_image_url"],
      media_type: media["mime_type"],
      width: media["width"],
      height: media["height"],
      duration: media["duration_ms"],
      metadata: %{
        "id" => media["id"],
        "media_key" => media["media_key"],
        "raw" => media
      }
    })
  end

  defp media_kind(type) when type in ["photo", "animated_gif", "image"], do: :image
  defp media_kind(type) when type in ["video"], do: :video
  defp media_kind(type) when type in ["audio"], do: :audio
  defp media_kind(_type), do: :file

  defp format_crc_response(%WebhookRequest{} = request, secret, true) do
    cond do
      secret in [nil, ""] ->
        WebhookResponse.error(400, "missing_consumer_secret")

      crc_token(request) in [nil, ""] ->
        WebhookResponse.error(400, "missing_crc_token")

      true ->
        WebhookResponse.new(%{
          status: 200,
          body: %{"response_token" => "sha256=" <> hmac(secret, crc_token(request))}
        })
    end
  end

  defp format_crc_response(_request, _secret, false), do: WebhookResponse.accepted()

  defp valid_signature?(request, secret) do
    signature = WebhookRequest.header(request, "x-twitter-webhooks-signature")

    signature not in [nil, ""] and
      secure_compare(signature, "sha256=" <> hmac(secret, raw_body(request)))
  end

  defp consumer_secret(opts) do
    Keyword.get(opts, :consumer_secret) || System.get_env("X_CONSUMER_SECRET") ||
      System.get_env("SECRET_KEY")
  end

  defp get_request?(method), do: String.upcase(method || "POST") == "GET"

  defp conversation_room_id?(room_id),
    do: String.starts_with?(to_string(room_id), "conversation:")

  defp conversation_id(room_id),
    do: String.replace_prefix(to_string(room_id), "conversation:", "")

  defp filename_from_url(url) when is_binary(url) do
    url
    |> URI.parse()
    |> Map.get(:path)
    |> case do
      nil -> nil
      "" -> nil
      path -> path |> Path.basename() |> URI.decode()
    end
  rescue
    _ -> nil
  end

  defp filename_from_url(_url), do: nil

  defp blank_to_nil(nil), do: nil
  defp blank_to_nil(""), do: nil
  defp blank_to_nil(value) when is_binary(value), do: value
  defp blank_to_nil(value), do: to_string(value)

  defp author(nil), do: nil

  defp author(user_id),
    do: Author.new(%{user_id: to_string(user_id), user_name: to_string(user_id)})

  defp transport(opts), do: Keyword.get(opts, :transport, XdkClient)
  defp thread_id(room_id), do: "x:#{room_id}"
  defp crc_token(%WebhookRequest{query: query}), do: query["crc_token"] || query[:crc_token]
  defp raw_body(%WebhookRequest{raw: raw}) when is_binary(raw), do: raw
  defp raw_body(%WebhookRequest{metadata: %{"raw_body" => raw}}) when is_binary(raw), do: raw
  defp raw_body(%WebhookRequest{metadata: %{raw_body: raw}}) when is_binary(raw), do: raw
  defp raw_body(%WebhookRequest{payload: payload}), do: Jason.encode!(payload)
  defp hmac(secret, data), do: :crypto.mac(:hmac, :sha256, secret, data) |> Base.encode64()
  defp secure_compare(a, b) when byte_size(a) == byte_size(b), do: :crypto.hash_equals(a, b)
  defp secure_compare(_, _), do: false
end