lib/streaming_server.ex

defmodule ExGoogleSTT.StreamingServer do
  @moduledoc """
  A client process for Streaming API.

  Once a client is started, it establishes a connection to the Streaming API,
  gets ready to send requests to the API and forwards incoming responses to a set process.

  ## Requests

  The requests can be sent using `send_request/3`. Each request should be a
  `t:Google.Cloud.Speech.V2.StreamingRecognizeRequest.t/0` struct created using
  `Google.Cloud.Speech.V2.StreamingRecognizeRequest`.
  This is an auto-generated module, so check out [this notice](readme.html#auto-generated-modules) and
  [API reference](https://cloud.google.com/speech-to-text/docs/reference/rpc/google.cloud.speech.V2#google.cloud.speech.V2.StreamingRecognizeRequest)

  ## Responses

  The client sends responses from API via messages to the target (by default it is the process that spawned client).
  Each message is a struct `t:Google.Cloud.Speech.V2.StreamingRecognizeResponse.t/0` or a tuple with pid of sender and the same struct.
  Message format is controlled by `include_sender` option of a client.

  ## Usage

  1. Start the Server
  1. Send a config request with `Google.Cloud.Speech.V2.StreamingRecognitionConfig`
  1. Send request(s) with `Google.Cloud.Speech.V2.RecognitionAudio` containing audio data
  1. (async) Receive messages conatining `Google.Cloud.Speech.V2.StreamingRecognizeResponse`
  1. Send final `Google.Cloud.Speech.V2.RecognitionAudio` with option `end_stream: true`
     or call `end_stream/1` after final audio chunk has been sent.
  1. Stop the client after receiving all results

  See [README](readme.html) for code example
  """

  use GenServer

  alias ExGoogleSTT.GrpcSpeechClient

  alias Google.Cloud.Speech.V2.{
    StreamingRecognizeRequest,
    StreamingRecognizeResponse
  }

  @typedoc "Format of messages sent by the client to the target"
  @type message :: StreamingRecognizeResponse.t() | {pid(), StreamingRecognizeResponse.t()}

  @doc """
  Starts a linked client process.

  Possible options are:
  - `target` - A pid of a process that will receive recognition results. Defaults to `self()`.
  - `monitor_target` - If set to true, a client will monitor the target and shutdown
  if the target is down
  - `include_sender` - If true, a client will include its pid in messages sent to the target.
  """
  @spec start_link(options :: Keyword.t()) :: {:ok, pid} | {:error, any()}
  def start_link(options \\ []) do
    options =
      options
      |> Map.new()
      |> Map.put_new(:target, self())
      |> Map.put_new(:monitor_target, false)
      |> Map.put_new(:include_sender, false)

    GenServer.start_link(__MODULE__, options)
  end

  @doc """
  Stops a client process.
  """
  @spec stop(client :: pid()) :: :ok
  defdelegate stop(pid), to: GenServer

  @doc """
  Sends a request to the API. If option `end_stream: true` is passed,
  closes a client-side gRPC stream.
  """
  @spec send_config(client :: pid(), StreamingRecognizeRequest.t(), Keyword.t()) :: :ok
  def send_config(_pid, _cfg_request, opts \\ [])

  def send_config(
        pid,
        %StreamingRecognizeRequest{streaming_request: {:streaming_config, _}} = cfg_request,
        opts
      ) do
    GenServer.cast(pid, {:send_requests, [cfg_request], opts})
    :ok
  end

  def send_config(_pid, _cfg_request, _), do: raise("Bad Config Request")

  @doc """
  Sends a request to the API. If option `end_stream: true` is passed,
  closes a client-side gRPC stream.
  """
  @spec send_request(client :: pid(), StreamingRecognizeRequest.t(), Keyword.t()) :: :ok
  def send_request(pid, request, opts \\ []) do
    GenServer.cast(pid, {:send_requests, [request], opts})
    :ok
  end

  @doc """
  Sends a list of requests to the API. If option `end_stream: true` is passed,
  closes a client-side gRPC stream.
  """
  @spec send_requests(client :: pid(), [StreamingRecognizeRequest.t()], Keyword.t()) :: :ok
  def send_requests(pid, request, opts \\ []) do
    GenServer.cast(pid, {:send_requests, request, opts})
    :ok
  end

  @doc """
  Closes a client-side gRPC stream.
  """
  @spec end_stream(client :: pid()) :: :ok
  def end_stream(pid) do
    GenServer.cast(pid, :end_stream)
    :ok
  end

  @impl true
  def init(opts) do
    {:ok, conn} = GrpcSpeechClient.start_link()
    state = opts |> Map.merge(%{conn: conn})

    if opts.monitor_target do
      Process.monitor(opts.target)
    end

    {:ok, state}
  end

  @impl true
  def handle_cast({:send_requests, requests, opts}, state) do
    :ok = state.conn |> GrpcSpeechClient.send_requests(requests, opts)
    {:noreply, state}
  end

  @impl true
  def handle_cast(:end_stream, state) do
    :ok = state.conn |> GrpcSpeechClient.end_stream()
    {:noreply, state}
  end

  @impl true
  def handle_info(%StreamingRecognizeResponse{} = response, state) do
    if state.include_sender do
      send(state.target, {self(), response})
    else
      send(state.target, response)
    end

    {:noreply, state}
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, pid, _reason}, %{target: pid} = state) do
    {:stop, :normal, state}
  end

  @impl true
  def terminate(_reason, state) do
    state.conn |> GrpcSpeechClient.stop()
  end
end