lib/gen_lsp/buffer.ex

defmodule GenLSP.Buffer do
  @moduledoc """
  The data buffer between the LSP process and the communication channel.
  """
  use GenServer

  require Logger

  @options_schema NimbleOptions.new!(
                    communication: [
                      type: :mod_arg,
                      default: {GenLSP.Communication.Stdio, []},
                      doc:
                        "A `{module, args}` tuple, where `module` implements the `GenLSP.Communication.Adapter` behaviour."
                    ],
                    name: [
                      type: :atom,
                      doc:
                        "Used for name registration as described in the \"Name registration\" section in the documentation for `GenServer`."
                    ]
                  )

  @doc """
  Starts a `GenLSP.Buffer` process that is linked to the current process.

  ## Options

  #{NimbleOptions.docs(@options_schema)}
  """
  def start_link(opts) do
    opts = NimbleOptions.validate!(opts, @options_schema)
    GenServer.start_link(__MODULE__, opts, Keyword.take(opts, [:name]))
  end

  @doc false
  def listen(server, lsp) do
    GenServer.cast(server, {:listen, lsp})
  end

  @doc false
  def incoming(server, packet) do
    GenServer.cast(server, {:incoming, packet})
  end

  @doc false
  def outgoing(server, packet) do
    GenServer.cast(server, {:outgoing, packet})
  end

  @doc false
  def outgoing_sync(server, packet, timeout \\ :infinity) do
    GenServer.call(server, {:outgoing_sync, packet}, timeout)
  end

  @doc false
  def comm_state(server) do
    GenServer.call(server, :comm_state)
  end

  @doc false
  def init(opts) do
    {comm, comm_args} = opts[:communication]
    {:ok, comm_data} = comm.init(comm_args)

    {:ok, %{comm: comm, comm_data: comm_data, awaiting_response: Map.new()}}
  end

  @doc false
  def handle_call(:comm_state, _from, %{comm_data: comm_data} = state) do
    {:reply, comm_data, state}
  end

  def handle_call({:outgoing_sync, %{"id" => id} = packet}, from, state) do
    :telemetry.span([:gen_lsp, :buffer, :outgoing], %{kind: :sync}, fn ->
      :ok = state.comm.write(Jason.encode!(packet), state.comm_data)
      {:ok, %{}}
    end)

    {:noreply, %{state | awaiting_response: Map.put(state.awaiting_response, id, from)}}
  end

  @doc false
  def handle_cast({:incoming, packet}, %{lsp: lsp} = state) do
    state =
      :telemetry.span([:gen_lsp, :buffer, :incoming], %{}, fn ->
        state =
          case Jason.decode!(packet) do
            %{"id" => id, "result" => result} when is_map_key(state.awaiting_response, id) ->
              {from, awaiting_response} = Map.pop(state.awaiting_response, id)
              GenServer.reply(from, result)

              %{state | awaiting_response: awaiting_response}

            %{"id" => _} = request ->
              GenLSP.request_server(lsp, request)
              state

            notification ->
              GenLSP.notify_server(lsp, notification)
              state
          end

        {state, %{}}
      end)

    {:noreply, state}
  end

  def handle_cast({:outgoing, packet}, state) do
    :telemetry.span([:gen_lsp, :buffer, :outgoing], %{kind: :async}, fn ->
      :ok = state.comm.write(Jason.encode!(packet), state.comm_data)
      {:ok, %{}}
    end)

    {:noreply, state}
  end

  def handle_cast({:listen, lsp}, state) do
    read(state.comm, state.comm_data)

    {:noreply, Map.put(state, :lsp, lsp)}
  end

  @doc false
  def handle_info({:update_comm_data, comm_data}, state) do
    {:noreply, %{state | comm_data: comm_data}}
  end

  defp read(comm, comm_data) do
    me = self()

    Task.start_link(fn ->
      {:ok, comm_data} = comm.listen(comm_data)
      send(me, {:update_comm_data, comm_data})

      Stream.resource(
        fn -> "" end,
        fn buffer ->
          case comm.read(comm_data, buffer) do
            :eof ->
              if Application.get_env(:gen_lsp, :exit_on_end, true) do
                System.stop()
              end

              {:halt, :ok}

            {:error, reason} ->
              {:halt, {:error, reason}}

            {:ok, body, buffer} ->
              incoming(me, body)

              {[body], buffer}
          end
        end,
        fn
          {:error, reason} ->
            IO.warn("Unable to read from device: #{inspect(reason)}")

          _ ->
            :ok
        end
      )
      |> Enum.to_list()
    end)
  end

  @doc false
  def log(message) do
    Logger.debug(message)
  end
end