lib/membrane_hackney/source.ex

defmodule Membrane.Hackney.Source do
  @moduledoc """
  This module provides a source element allowing you to receive data as a client
  using HTTP. It is based upon [Hackney](https://github.com/benoitc/hackney)
  library that is responsible for making HTTP requests.

  See the `t:t/0` for the available configuration options.
  """
  use Membrane.Source

  import Mockery.Macro

  require Membrane.Logger
  alias Membrane.{Buffer, RemoteStream, Time}

  @resource_tag :hackney_source_resource

  def_output_pad :output,
    accepted_format: %RemoteStream{type: :bytestream, content_format: nil},
    flow_control: :manual

  def_options location: [
                type: :string,
                description: "The URL to fetch by the element"
              ],
              method: [
                type: :atom,
                spec: :get | :post | :put | :patch | :delete | :head | :options,
                description: "HTTP method that will be used when making a request",
                default: :get
              ],
              body: [
                type: :string,
                description: "The request body",
                default: ""
              ],
              headers: [
                type: :keyword,
                description:
                  "List of additional request headers in format accepted by `:hackney.request/5`",
                default: []
              ],
              hackney_opts: [
                type: :keyword,
                description:
                  "Additional options for Hackney in format accepted by `:hackney.request/5`",
                default: []
              ],
              max_retries: [
                type: :integer,
                spec: non_neg_integer() | :infinity,
                description: """
                Maximum number of retries before returning an error. Can be set to `:infinity`.
                """,
                default: 0
              ],
              retry_delay: [
                type: :time,
                description: """
                Delay between reconnection attempts in case of connection error.
                """,
                default: Time.second()
              ],
              is_live: [
                type: :boolean,
                description: """
                Assume the source is live. If true, when resuming after error,
                the element will not use `Range` header to skip to the
                current position in bytes.
                """,
                default: false
              ]

  @impl true
  def handle_init(_ctx, %__MODULE__{} = options) do
    state =
      options
      |> Map.merge(%{
        async_response: nil,
        retries: 0,
        streaming: false,
        pos_counter: 0
      })

    {[], state}
  end

  @impl true
  def handle_playing(ctx, state) do
    {actions, state} = connect(ctx, state)
    actions = actions ++ [stream_format: {:output, %RemoteStream{type: :bytestream}}]
    {actions, state}
  end

  @impl true
  def handle_demand(:output, _size, _unit, _ctx, %{streaming: true} = state) do
    # We have already requested next frame (using :hackney.stream_next())
    # so we do nothinig
    {[], state}
  end

  def handle_demand(:output, _size, _unit, _ctx, %{async_response: nil} = state) do
    # We're waiting for reconnect
    {[], state}
  end

  def handle_demand(:output, _size, _unit, ctx, state) do
    Membrane.Logger.debug_verbose("Hackney: requesting next chunk")

    case state.async_response |> mockable(:hackney).stream_next() do
      :ok ->
        {[], %{state | streaming: true}}

      {:error, reason} ->
        Membrane.Logger.warning("Hackney.stream_next/1 error: #{inspect(reason)}")

        # Error here is rather caused by library error,
        # so we retry without delay - we will either sucessfully reconnect
        # or will get an error resulting in retry with delay
        retry({:stream_next, reason}, ctx, close_request(ctx, state), false)
    end
  end

  @impl true
  def handle_info({:hackney_response, msg_id, msg}, _ctx, %{async_response: id} = state)
      when msg_id != id do
    Membrane.Logger.warning(
      "Ignoring message #{inspect(msg)} because it does not match current response id: #{inspect(id)}"
    )

    {[], state}
  end

  def handle_info(
        {:hackney_response, id, {:status, code, desc}},
        _ctx,
        %{async_response: id} = state
      )
      when code in [200, 206] do
    Membrane.Logger.debug_verbose("Hackney: Got #{code} #{desc}")
    {[redemand: :output], %{state | streaming: false, retries: 0}}
  end

  def handle_info(
        {:hackney_response, id, {:status, code, _data}},
        ctx,
        %{async_response: id} = state
      )
      when code in [301, 302] do
    Membrane.Logger.warning("""
    Got #{inspect(code)} status indicating redirection.
    If you want to follow add `follow_redirect: true` to :poison_opts
    """)

    retry({:hackney, :redirect}, ctx, close_request(ctx, state))
  end

  def handle_info(
        {:hackney_response, id, {:status, 416, _data}},
        ctx,
        %{async_response: id} = state
      ) do
    Membrane.Logger.warning(
      "Hackney: Got 416 Invalid Range (pos_counter is #{inspect(state.pos_counter)})"
    )

    retry({:hackney, :invalid_range}, ctx, close_request(ctx, state))
  end

  def handle_info(
        {:hackney_response, id, {:status, code, _data}},
        ctx,
        %{async_response: id} = state
      ) do
    Membrane.Logger.warning("Hackney: Got unexpected status code #{code}")
    retry({:http_code, code}, ctx, close_request(ctx, state))
  end

  def handle_info(
        {:hackney_response, id, {:headers, headers}},
        _ctx,
        %{async_response: id} = state
      ) do
    Membrane.Logger.debug_verbose("Hackney: Got headers #{inspect(headers)}")

    {[redemand: :output], %{state | streaming: false}}
  end

  def handle_info(
        {:hackney_response, id, chunk},
        %{playback: :playing},
        %{async_response: id} = state
      )
      when is_binary(chunk) do
    state =
      state
      |> Map.update!(:pos_counter, &(&1 + byte_size(chunk)))

    actions = [buffer: {:output, %Buffer{payload: chunk}}, redemand: :output]
    {actions, %{state | streaming: false}}
  end

  def handle_info({:hackney_response, id, chunk}, _ctx, %{async_response: id} = state)
      when is_binary(chunk) do
    # We received chunk after we've stopped playing. We'll ignore that data.
    {[], %{state | streaming: false}}
  end

  def handle_info({:hackney_response, id, :done}, _ctx, %{async_response: id} = state) do
    new_state = %{state | streaming: false, async_response: nil}
    {[end_of_stream: :output], new_state}
  end

  def handle_info({:hackney_response, id, {:error, reason}}, ctx, %{async_response: id} = state) do
    Membrane.Logger.warning("Hackney error #{inspect(reason)}")

    retry({:hackney, reason}, ctx, close_request(ctx, state))
  end

  def handle_info(
        {:hackney_response, id, {redirect, new_location, _headers}},
        ctx,
        %{async_response: id} = state
      )
      when redirect in [:redirect, :see_other] do
    Membrane.Logger.debug("Hackney: redirecting to #{new_location}")

    state = %{state | location: new_location, streaming: false}
    state = close_request(ctx, state)
    connect(ctx, state)
  end

  def handle_info(:reconnect, ctx, state) do
    connect(ctx, state)
  end

  defp retry(reason, ctx, state, delay? \\ true)

  defp retry(reason, _ctx, %{retries: retries, max_retries: max_retries}, _delay)
       when retries >= max_retries do
    raise "Error: Max retries number reached. Retry reason: #{inspect(reason)}"
  end

  defp retry(_reason, ctx, state, false) do
    connect(ctx, %{state | retries: state.retries + 1})
  end

  defp retry(_reason, _ctx, %{retry_delay: delay, retries: retries} = state, true) do
    Process.send_after(self(), :reconnect, Time.as_milliseconds(delay, :round))
    {[], %{state | retries: retries + 1}}
  end

  defp connect(ctx, state) do
    %{
      method: method,
      location: location,
      body: body,
      headers: headers,
      hackney_opts: opts,
      pos_counter: pos,
      is_live: is_live
    } = state

    opts = opts |> Keyword.merge(stream_to: self(), async: :once)

    headers =
      if pos > 0 and not is_live do
        [{"Range", "bytes=#{pos}-"} | headers]
      else
        headers
      end

    Membrane.Logger.debug(
      "Hackney: connecting, request: #{inspect({method, location, body, headers, opts})}"
    )

    case mockable(:hackney).request(method, location, headers, body, opts) do
      {:ok, async_response} ->
        Membrane.ResourceGuard.register(
          ctx.resource_guard,
          fn -> mockable(:hackney).close(async_response) end,
          tag: @resource_tag
        )

        {[], %{state | async_response: async_response, streaming: true}}

      {:error, reason} ->
        Membrane.Logger.warning("""
        Error while making a request #{inspect({method, location, body, headers, opts})},
        reason #{inspect(reason)}
        """)

        retry({:haceney, reason}, ctx, state)
    end
  end

  defp close_request(_ctx, %{async_response: nil} = state) do
    %{state | streaming: false}
  end

  defp close_request(ctx, state) do
    Membrane.ResourceGuard.cleanup(ctx.resource_guard, @resource_tag)
    %{state | async_response: nil, streaming: false}
  end
end