Skip to main content

src/fcgi.gleam

//// FastCGI Responder server.

import exception
import fcgi/internal/protocol
import fcgi/internal/responder
import gleam/bit_array
import gleam/bool
import gleam/bytes_tree.{type BytesTree}
import gleam/dict.{type Dict}
import gleam/erlang/atom.{type Atom}
import gleam/erlang/process
import gleam/http
import gleam/http/request.{type Request}
import gleam/http/response.{type Response}
import gleam/int
import gleam/list
import gleam/option.{type Option}
import gleam/otp/actor
import gleam/otp/factory_supervisor
import gleam/otp/static_supervisor
import gleam/otp/supervision
import gleam/result
import gleam/string
import logging

const default_body_read_timeout_ms = 30_000

const default_max_body_size = 268_435_456

const socket_owner_transfer_timeout_ms = 5000

const path_janitor_init_timeout_ms = 1000

const accept_retry_backoff_ms = 100

/// Listen address set on a `Builder` via `listen_unix` or `listen_tcp`.
pub opaque type Address {
  PathAddress(path: String)
  TcpAddress(host: String, port: Int)
}

/// Server configuration produced by `new`. Pass it to `start` to begin
/// listening.
pub opaque type Builder(address) {
  Builder(
    handler: Handler,
    address: address,
    max_body_size: Int,
    body_read_timeout_ms: Int,
  )
}

type Handler =
  fn(Request(BodyReader), Context) -> Response(ResponseData)

/// Build a new FastCGI server with the given handler.
///
/// The handler is invoked once `Params` is fully received. The request
/// body is delivered incrementally via a `BodyReader` in `req.body`;
/// call it to obtain the first `Read` and thread `consume` to advance,
/// or pass it to `read_all` for the buffered case.
///
/// Default: 256 MiB max body, 30 s body read timeout.
pub fn new(handler: Handler) -> Builder(Nil) {
  Builder(
    handler:,
    address: Nil,
    max_body_size: default_max_body_size,
    body_read_timeout_ms: default_body_read_timeout_ms,
  )
}

/// Set the Unix domain socket path the server listens on.
pub fn listen_unix(
  builder: Builder(address),
  path: String,
) -> Builder(Address) {
  Builder(
    handler: builder.handler,
    address: PathAddress(path),
    max_body_size: builder.max_body_size,
    body_read_timeout_ms: builder.body_read_timeout_ms,
  )
}

/// Set the TCP `host` and `port` the server listens on. `host` must be a
/// numeric IP literal: either IPv4 (e.g. `"127.0.0.1"`, `"0.0.0.0"`) or
/// IPv6 (e.g. `"::1"`, `"::"`).
pub fn listen_tcp(
  builder: Builder(address),
  host host: String,
  port port: Int,
) -> Builder(Address) {
  Builder(
    handler: builder.handler,
    address: TcpAddress(host, port),
    max_body_size: builder.max_body_size,
    body_read_timeout_ms: builder.body_read_timeout_ms,
  )
}

/// Set the maximum body bytes the server will deliver to the handler in
/// total across all body reads. Must be `>= 0`; `start` returns
/// `InvalidMaxBodySize(bytes)` for negative values.
///
/// When the peer sends more than this many bytes, the next body read
/// returns `Error(BodyTooLarge)`. The handler may respond as it sees fit,
/// but the connection is closed after the response is sent because
/// remaining body bytes cannot be safely drained.
///
/// Default: 256 MiB.
pub fn max_body_size(
  builder: Builder(address),
  bytes: Int,
) -> Builder(address) {
  Builder(..builder, max_body_size: bytes)
}

/// Set how long the server waits for the next stdin or params record
/// before giving up. Must be `> 0`. Applies between successive socket
/// reads, including the wait for the first record after `accept`, not
/// to the request as a whole. Returns `Error(ReadTimeout)` to body
/// readers. Default: 30,000 ms.
pub fn body_read_timeout(
  builder: Builder(address),
  milliseconds: Int,
) -> Builder(address) {
  Builder(..builder, body_read_timeout_ms: milliseconds)
}

/// Trusted CGI metadata supplied by the upstream proxy and handed to
/// every request handler alongside the `Request`.
pub type Context {
  Context(
    /// Client address as reported by the proxy (CGI `REMOTE_ADDR`).
    remote_addr: Option(String),
    /// Client port parsed as `Int`; absent if the proxy did not supply a
    /// numeric value (CGI `REMOTE_PORT`).
    remote_port: Option(Int),
    /// Reverse-DNS hostname of the client, when the proxy resolved one
    /// (CGI `REMOTE_HOST`).
    remote_host: Option(String),
    /// Authenticated identity, when the proxy set one (CGI `REMOTE_USER`).
    remote_user: Option(String),
    /// Authentication scheme, e.g. `"Basic"` (CGI `AUTH_TYPE`).
    auth_type: Option(String),
    /// Mount prefix assigned to the app by the proxy (CGI `SCRIPT_NAME`).
    script_name: Option(String),
    /// HTTP protocol version reported by the proxy, e.g. `"HTTP/1.1"`
    /// (CGI `SERVER_PROTOCOL`).
    server_protocol: Option(String),
    /// Identification string from the upstream proxy (CGI `SERVER_SOFTWARE`).
    server_software: Option(String),
    /// Any other CGI variables, keyed by their original uppercase name.
    /// Typical entries include `"DOCUMENT_ROOT"` and `"REQUEST_URI"`.
    extra: Dict(String, String),
  )
}

/// Streaming reader for the request body. The handler receives one in
/// `req.body`; call it to obtain the first `Read`, then advance via the
/// `consume` continuation returned in each `Chunk`.
pub type BodyReader =
  fn() -> Result(Read, ReadError)

/// What a `BodyReader` produced.
pub type Read {
  /// A chunk of body bytes plus a `consume` continuation that returns
  /// the next chunk when called.
  Chunk(data: BitArray, consume: BodyReader)
  /// The body has been fully delivered.
  End
}

/// Why a body read failed.
pub type ReadError {
  /// The connection from the upstream proxy was closed before the body
  /// was fully delivered.
  ClientDisconnected
  /// No chunk arrived within the configured `body_read_timeout`.
  ReadTimeout
  /// The body exceeded `max_body_size`.
  BodyTooLarge
  /// The upstream proxy sent `FCGI_ABORT_REQUEST` or an unrecoverable
  /// framing error while the handler was reading the body. The
  /// connection will be closed after the handler returns and the
  /// handler's response, if any, is discarded.
  RequestAborted
}

/// Buffer the entire body into a `BytesTree`. The server's
/// `max_body_size` setting is the upper bound; the underlying reader
/// returns `BodyTooLarge` if the peer exceeds it.
///
/// The buffered length reflects what the upstream proxy actually sent,
/// not what `CONTENT_LENGTH` advertised.
pub fn read_all(read: BodyReader) -> Result(BytesTree, ReadError) {
  read_all_loop(read(), bytes_tree.new())
}

fn read_all_loop(
  read: Result(Read, ReadError),
  acc: BytesTree,
) -> Result(BytesTree, ReadError) {
  case read {
    Error(reason) -> Error(reason)
    Ok(End) -> Ok(acc)
    Ok(Chunk(data, consume)) ->
      read_all_loop(consume(), bytes_tree.append(acc, data))
  }
}

/// Why `send_file` could not produce a `File` body.
pub type FileError {
  /// No file exists at the given path.
  FileNotFound(path: String)
  /// The process lacks permission to read the file.
  FileAccessDenied(path: String)
  /// The path resolved to a directory, not a file.
  FileIsDirectory(path: String)
  /// Any other filesystem error, with the underlying reason as a human-readable string.
  FileOther(path: String, reason: String)
  /// `offset` is negative, or `limit` is `Some(n)` with `n < 0`.
  InvalidRange(offset: Int, limit: Option(Int))
}

/// What the server should send back as a response body. Construct with
/// `bytes`, `send_file`, or `stream`.
pub opaque type ResponseData {
  Bytes(content: BytesTree)
  File(handle: Handle, offset: Int, length: Int)
  Stream(producer: fn(StreamSender) -> Nil)
}

/// Use `send_chunk` to emit body bytes; each call writes one or more FastCGI
/// `STDOUT` records on the open connection.
pub opaque type StreamSender {
  StreamSender(socket: Socket, request_id: Int)
}

/// Build an in-memory response body. The whole `BytesTree` is sent in
/// one or more `STDOUT` records.
pub fn bytes(content: BytesTree) -> ResponseData {
  Bytes(content:)
}

/// Open a file and return a response body that streams it via
/// `file:sendfile/5` when the response is sent.
///
/// Returns `FileError` if the file is missing, inaccessible, a
/// directory, or if `offset` or `limit` is negative.
pub fn send_file(
  path path: String,
  offset offset: Int,
  limit limit: Option(Int),
) -> Result(ResponseData, FileError) {
  use <- bool.guard(
    when: offset < 0 || option.unwrap(limit, 0) < 0,
    return: Error(InvalidRange(offset:, limit:)),
  )
  use #(handle, total_size) <- result.try(open_and_size(path))
  let max_length = option.unwrap(limit, total_size)
  let length = int.clamp(total_size - offset, min: 0, max: max_length)
  Ok(File(handle:, offset:, length:))
}

/// Build a streaming response body. The server calls `producer(sender)`
/// after sending the response headers; each call to `send_chunk(sender,
/// data)` writes one or more `STDOUT` records to the upstream proxy.
///
/// A panic raised by `producer` is caught; the response end records are
/// still emitted so the upstream proxy sees a clean end-of-request.
pub fn stream(producer: fn(StreamSender) -> Nil) -> ResponseData {
  Stream(producer:)
}

/// Build a response body from a `String`.
pub fn string(content: String) -> ResponseData {
  Bytes(content: bytes_tree.from_string(content))
}

/// Emit a chunk of body bytes from inside a `Stream` producer.
///
/// Returns `Ok(Nil)` when the chunk is written, or `Error(Nil)` when the
/// underlying socket write fails (for example, the upstream proxy has
/// disconnected).
pub fn send_chunk(sender: StreamSender, data: BytesTree) -> Result(Nil, Nil) {
  let StreamSender(socket:, request_id:) = sender
  send_if_nonempty(socket, responder.encode_stdout_chunk(request_id, data))
}

/// Why the listener could not start.
pub type StartError {
  /// Wraps a failure from the underlying listener, such as bind or
  /// listen failures, an unparseable TCP host, or a Unix-socket path
  /// that is already in use.
  ListenerError(reason: String)
  /// `max_body_size` was set to a negative value.
  InvalidMaxBodySize(bytes: Int)
  /// `body_read_timeout` was set to zero or a negative value.
  InvalidBodyReadTimeout(milliseconds: Int)
}

/// Running server returned by `start`. `bound_port` is `Some(port)` for a
/// TCP listener (useful when `listen_tcp` was given port `0` and the
/// kernel chose an ephemeral port) and `None` for a Unix-domain listener.
pub type Server {
  Server(supervisor: static_supervisor.Supervisor, bound_port: Option(Int))
}

/// Start the server.
pub fn start(
  builder: Builder(Address),
) -> Result(actor.Started(Server), StartError) {
  use <- bool.guard(
    when: builder.max_body_size < 0,
    return: Error(InvalidMaxBodySize(builder.max_body_size)),
  )
  use <- bool.guard(
    when: builder.body_read_timeout_ms <= 0,
    return: Error(InvalidBodyReadTimeout(builder.body_read_timeout_ms)),
  )
  let address = builder.address
  use socket <- result.try(listen_on_address(address))
  use bound_port <- result.try(resolve_bound_port(socket, address))
  let factory_name = process.new_name(prefix: "fcgi_server_factory")

  let supervisor =
    build_supervisor(
      address,
      socket,
      factory_name,
      builder.max_body_size,
      builder.body_read_timeout_ms,
      builder.handler,
    )

  use started <- result.try(start_supervisor(supervisor, socket, address))

  case controlling_process(socket, started.pid) {
    Ok(Nil) -> {
      logging.log(
        logging.Info,
        "fcgi listening on " <> describe_address(address),
      )
      let server = Server(supervisor: started.data, bound_port:)
      Ok(actor.Started(pid: started.pid, data: server))
    }
    Error(error) -> {
      process.unlink(started.pid)
      process.send_abnormal_exit(started.pid, atom.create("shutdown"))
      cleanup_socket(socket, address)
      Error(ListenerError(
        "controlling_process failed: " <> describe_transport_error(error),
      ))
    }
  }
}

/// Build a `supervision.ChildSpecification` so the server runs under an
/// OTP supervisor.
pub fn supervised(
  builder: Builder(Address),
) -> supervision.ChildSpecification(Server) {
  supervision.supervisor(fn() {
    start(builder)
    |> result.map_error(fn(error) {
      let reason = case error {
        ListenerError(reason) -> reason
        InvalidMaxBodySize(bytes) ->
          "max_body_size must be non-negative; got " <> int.to_string(bytes)
        InvalidBodyReadTimeout(milliseconds) ->
          "body_read_timeout must be positive; got "
          <> int.to_string(milliseconds)
      }

      actor.InitFailed(reason)
    })
  })
}

fn listen_on_address(address: Address) -> Result(Socket, StartError) {
  case address {
    PathAddress(path) ->
      do_listen_unix(path)
      |> result.map_error(fn(error) {
        case error {
          PathExists(path) ->
            ListenerError("socket path already exists: " <> path)
          _ ->
            ListenerError("listen failed: " <> describe_transport_error(error))
        }
      })
    TcpAddress(host, port) ->
      do_listen_tcp(host, port)
      |> result.map_error(fn(error) {
        case error {
          InvalidHost(host) ->
            ListenerError("host must be a numeric IP literal: " <> host)
          _ ->
            ListenerError("listen failed: " <> describe_transport_error(error))
        }
      })
  }
}

fn resolve_bound_port(
  socket: Socket,
  address: Address,
) -> Result(Option(Int), StartError) {
  case address {
    PathAddress(_) -> Ok(option.None)
    TcpAddress(_, _) ->
      case do_socket_port(socket) {
        Ok(port) -> Ok(option.Some(port))
        Error(error) -> {
          close_socket(socket)
          Error(ListenerError(
            "bound port lookup failed: " <> describe_transport_error(error),
          ))
        }
      }
  }
}

fn build_supervisor(
  address: Address,
  socket: Socket,
  factory_name: ConnectionFactoryName,
  max_body_size: Int,
  body_read_timeout_ms: Int,
  handler: Handler,
) -> static_supervisor.Builder {
  let supervisor = static_supervisor.new(static_supervisor.RestForOne)
  let supervisor = case address {
    PathAddress(path) ->
      static_supervisor.add(supervisor, path_janitor_supervised(path))
    TcpAddress(_, _) -> supervisor
  }

  supervisor
  |> static_supervisor.add(connection_factory_supervised(factory_name))
  |> static_supervisor.add(acceptor_supervised(
    socket,
    factory_name,
    max_body_size,
    body_read_timeout_ms,
    handler,
  ))
}

fn path_janitor_supervised(
  path: String,
) -> supervision.ChildSpecification(Nil) {
  supervision.worker(fn() { start_path_janitor(path) })
  |> supervision.restart(supervision.Transient)
}

fn start_path_janitor(path: String) -> actor.StartResult(Nil) {
  actor.new_with_initialiser(path_janitor_init_timeout_ms, fn(_subject) {
    process.trap_exits(True)
    let selector =
      process.new_selector()
      |> process.select_trapped_exits(fn(_msg) { Nil })
    actor.initialised(path)
    |> actor.selecting(selector)
    |> actor.returning(Nil)
    |> Ok
  })
  |> actor.on_message(fn(path, _message) {
    delete_path(path)
    actor.stop()
  })
  |> actor.start
}

fn connection_factory_supervised(
  name: ConnectionFactoryName,
) -> supervision.ChildSpecification(ConnectionFactory) {
  factory_supervisor.worker_child(start_connection_process)
  |> factory_supervisor.named(name)
  |> factory_supervisor.supervised
  |> supervision.restart(supervision.Transient)
}

fn acceptor_supervised(
  socket: Socket,
  factory_name: ConnectionFactoryName,
  max_body_size: Int,
  body_read_timeout_ms: Int,
  handler: Handler,
) -> supervision.ChildSpecification(Nil) {
  supervision.worker(fn() {
    let factory = factory_supervisor.get_by_name(factory_name)
    let pid =
      process.spawn(fn() {
        accept_loop(
          socket,
          factory,
          max_body_size,
          body_read_timeout_ms,
          handler,
        )
      })
    Ok(actor.Started(pid:, data: Nil))
  })
  |> supervision.restart(supervision.Transient)
}

fn start_supervisor(
  builder: static_supervisor.Builder,
  socket: Socket,
  address: Address,
) -> Result(actor.Started(static_supervisor.Supervisor), StartError) {
  case static_supervisor.start(builder) {
    Error(error) -> {
      cleanup_socket(socket, address)
      let reason = case error {
        actor.InitTimeout -> "supervisor init timeout"
        actor.InitFailed(reason) -> reason
        actor.InitExited(_) -> "supervisor init exited"
      }
      Error(ListenerError(reason))
    }
    Ok(started) -> Ok(started)
  }
}

fn cleanup_socket(socket: Socket, address: Address) -> Nil {
  close_socket(socket)
  case address {
    PathAddress(path) -> delete_path(path)
    TcpAddress(_, _) -> Nil
  }
}

fn describe_address(address: Address) -> String {
  case address {
    PathAddress(path) -> "unix:" <> path
    TcpAddress(host, port) -> {
      let host = case string.contains(host, ":") {
        True -> "[" <> host <> "]"
        False -> host
      }
      host <> ":" <> int.to_string(port)
    }
  }
}

fn describe_transport_error(error: SocketError) -> String {
  case error {
    Posix(reason) -> atom.to_string(reason)
    PathExists(_) | InvalidHost(_) -> "unexpected transport error"
  }
}

fn accept_loop(
  listen_socket: Socket,
  factory: ConnectionFactory,
  max_body_size: Int,
  body_read_timeout_ms: Int,
  handler: Handler,
) -> Nil {
  case accept(listen_socket) {
    Error(reason) ->
      case atom.to_string(reason) {
        "closed" ->
          logging.log(logging.Info, "acceptor stopping: listener closed")
        name -> {
          logging.log(
            logging.Warning,
            "accept failed: " <> name <> ", retrying",
          )
          process.sleep(accept_retry_backoff_ms)
          accept_loop(
            listen_socket,
            factory,
            max_body_size,
            body_read_timeout_ms,
            handler,
          )
        }
      }
    Ok(client_socket) -> {
      start_connection(
        client_socket,
        factory,
        max_body_size,
        body_read_timeout_ms,
        handler,
      )
      accept_loop(
        listen_socket,
        factory,
        max_body_size,
        body_read_timeout_ms,
        handler,
      )
    }
  }
}

fn start_connection(
  client_socket: Socket,
  factory: ConnectionFactory,
  max_body_size: Int,
  body_read_timeout_ms: Int,
  handler: Handler,
) -> Nil {
  let connection =
    Connection(
      socket: client_socket,
      max_body_size:,
      body_read_timeout_ms:,
      handler:,
    )
  case factory_supervisor.start_child(factory, connection) {
    Error(_) -> close_socket(client_socket)
    Ok(started) ->
      case controlling_process(client_socket, started.pid) {
        Error(_) -> {
          close_socket(client_socket)
          process.send_exit(started.pid)
        }
        Ok(Nil) -> process.send(started.data, Nil)
      }
  }
}

fn start_connection_process(
  connection: Connection,
) -> actor.StartResult(process.Subject(Nil)) {
  let report_back = process.new_subject()
  let pid =
    process.spawn(fn() {
      let go = process.new_subject()
      process.send(report_back, go)

      case process.receive(go, socket_owner_transfer_timeout_ms) {
        Ok(Nil) -> {
          run_connection_loop(connection, responder.Idle(<<>>))
          close_socket(connection.socket)
        }
        Error(Nil) -> close_socket(connection.socket)
      }
    })

  case process.receive(report_back, socket_owner_transfer_timeout_ms) {
    Ok(go) -> Ok(actor.Started(pid:, data: go))
    Error(Nil) -> {
      process.send_exit(pid)
      Error(actor.InitFailed("connection process did not report back"))
    }
  }
}

type Connection {
  Connection(
    socket: Socket,
    max_body_size: Int,
    body_read_timeout_ms: Int,
    handler: Handler,
  )
}

type ConnectionFactory =
  factory_supervisor.Supervisor(Connection, process.Subject(Nil))

type ConnectionFactoryName =
  process.Name(factory_supervisor.Message(Connection, process.Subject(Nil)))

type NextRequest {
  Ready(
    state: responder.State,
    request_id: Int,
    params: BitArray,
    remaining_events: List(responder.Event),
    keep_conn: Bool,
  )
  Closed
}

type AfterResponse {
  Continue
  DrainBody(snapshot: BodySnapshot)
}

fn run_connection_loop(connection: Connection, state: responder.State) -> Nil {
  case await_request(connection, state, <<>>) {
    Closed -> Nil
    Ready(state:, request_id:, params:, remaining_events:, keep_conn:) ->
      case
        serve_request(connection, state, request_id, params, remaining_events)
      {
        Error(_) -> Nil
        Ok(after_response) ->
          case keep_conn, after_response {
            False, Continue -> Nil
            False, DrainBody(_) -> Nil
            True, Continue -> run_connection_loop(connection, state)
            True, DrainBody(snap) ->
              case drain_body_loop(snap, connection) {
                Error(_) -> Nil
                Ok(next_state) -> run_connection_loop(connection, next_state)
              }
          }
      }
  }
}

fn step_and_flush(
  connection: Connection,
  state: responder.State,
  bytes: BitArray,
) -> responder.Outcome {
  let outcome =
    responder.step(state, bytes:, max_body_size: connection.max_body_size)
  let _ = send_if_nonempty(connection.socket, outcome.outgoing)
  outcome
}

fn await_request(
  connection: Connection,
  state: responder.State,
  pending: BitArray,
) -> NextRequest {
  let buffered = case state {
    responder.Idle(buf) -> buf
    responder.Receiving(recv) -> recv.buffer
  }
  use <- bool.lazy_guard(
    when: bit_array.byte_size(pending) == 0
      && bit_array.byte_size(buffered) == 0,
    return: fn() { wait_for_bytes(connection, state) },
  )
  let outcome = step_and_flush(connection, state, pending)
  case outcome.events {
    [responder.RequestReady(request_id, params, keep_conn), ..rest] ->
      Ready(
        state: outcome.state,
        request_id:,
        params:,
        remaining_events: rest,
        keep_conn:,
      )
    _ ->
      case outcome.next {
        responder.CloseConnection -> Closed
        responder.WaitForMore -> wait_for_bytes(connection, outcome.state)
      }
  }
}

fn wait_for_bytes(
  connection: Connection,
  state: responder.State,
) -> NextRequest {
  case recv(connection.socket, 0, connection.body_read_timeout_ms) {
    Error(_) -> Closed
    Ok(<<>>) -> Closed
    Ok(more) -> await_request(connection, state, more)
  }
}

fn serve_request(
  connection: Connection,
  state: responder.State,
  request_id: Int,
  params: BitArray,
  events: List(responder.Event),
) -> Result(AfterResponse, Nil) {
  case parse_params(params) {
    Error(message) -> {
      logging.log(logging.Warning, "rejecting request: " <> message)
      let response = error_response(400, message)
      send_response(connection.socket, request_id, response)
      |> result.replace(Continue)
    }
    Ok(#(req, cgi)) -> {
      let overflowed = list.contains(events, responder.BodyTooLarge)
      let #(reader, snapshot) = build_body_reader(connection, state, events)
      let req = request.set_body(req, reader)
      let response = run_user_handler(connection.handler, req, cgi)

      let latest =
        snapshot
        |> option.to_result(Nil)
        |> result.try(process.receive(_, 0))
      let aborted =
        latest
        |> result.map(fn(snap) { snap.aborted })
        |> result.unwrap(False)

      use <- bool.lazy_guard(when: aborted, return: fn() {
        dispose_response(response)
        Error(Nil)
      })
      use _ <- result.try(send_response(connection.socket, request_id, response))
      use <- bool.lazy_guard(when: overflowed, return: fn() {
        logging.log(
          logging.Warning,
          "closing connection: body exceeded max_body_size of "
            <> int.to_string(connection.max_body_size)
            <> " bytes",
        )
        Error(Nil)
      })

      case latest {
        Error(_) -> Ok(Continue)
        Ok(snap) -> Ok(DrainBody(snap))
      }
    }
  }
}

fn parse_params(params: BitArray) -> Result(#(Request(Nil), Context), String) {
  case protocol.parse_name_value_pairs(params) {
    Error(_) -> Error("malformed FastCGI parameters")
    Ok(pairs) ->
      to_http_request(pairs, Nil)
      |> result.map_error(request_error_message)
  }
}

fn error_response(status: Int, message: String) -> Response(ResponseData) {
  response.new(status)
  |> response.set_header("content-type", "text/plain; charset=utf-8")
  |> response.set_body(Bytes(bytes_tree.from_string(message)))
}

@internal
pub type RequestError {
  MissingMethod
  InvalidMethod(method: String)
}

type PartialRequest {
  PartialRequest(
    method: Option(String),
    https: Option(String),
    server_name: Option(String),
    server_port_raw: Option(String),
    http_host: Option(String),
    path: Option(String),
    query: Option(String),
    headers: List(#(String, String)),
    ctx: Context,
  )
}

@internal
pub fn to_http_request(
  pairs: List(#(String, String)),
  body: body,
) -> Result(#(Request(body), Context), RequestError) {
  let initial =
    PartialRequest(
      method: option.None,
      https: option.None,
      server_name: option.None,
      server_port_raw: option.None,
      http_host: option.None,
      path: option.None,
      query: option.None,
      headers: [],
      ctx: Context(
        remote_addr: option.None,
        remote_port: option.None,
        remote_host: option.None,
        remote_user: option.None,
        auth_type: option.None,
        script_name: option.None,
        server_protocol: option.None,
        server_software: option.None,
        extra: dict.new(),
      ),
    )
  list.fold(pairs, initial, fn(acc, pair) {
    let #(key, value) = pair
    apply_pair(acc, key, value)
  })
  |> finalize_request(body)
}

fn apply_pair(
  acc: PartialRequest,
  key: String,
  value: String,
) -> PartialRequest {
  case key {
    "REQUEST_METHOD" -> PartialRequest(..acc, method: option.Some(value))
    "HTTPS" -> PartialRequest(..acc, https: option.Some(value))
    "SERVER_NAME" -> PartialRequest(..acc, server_name: option.Some(value))
    "SERVER_PORT" -> PartialRequest(..acc, server_port_raw: option.Some(value))
    "HTTP_HOST" ->
      PartialRequest(..acc, http_host: option.Some(value), headers: [
        #("host", value),
        ..acc.headers
      ])
    "PATH_INFO" -> PartialRequest(..acc, path: option.Some(value))
    "QUERY_STRING" -> PartialRequest(..acc, query: option.Some(value))
    "CONTENT_TYPE" ->
      PartialRequest(..acc, headers: [#("content-type", value), ..acc.headers])
    "CONTENT_LENGTH" ->
      PartialRequest(..acc, headers: [#("content-length", value), ..acc.headers])
    "HTTP_" <> name ->
      PartialRequest(..acc, headers: [
        #(string.replace(string.lowercase(name), "_", "-"), value),
        ..acc.headers
      ])
    _ -> PartialRequest(..acc, ctx: apply_context_var(acc.ctx, key, value))
  }
}

fn apply_context_var(ctx: Context, key: String, value: String) -> Context {
  case key {
    "REMOTE_ADDR" -> Context(..ctx, remote_addr: option.Some(value))
    "REMOTE_PORT" ->
      Context(..ctx, remote_port: int.parse(value) |> option.from_result)
    "REMOTE_HOST" -> Context(..ctx, remote_host: option.Some(value))
    "REMOTE_USER" -> Context(..ctx, remote_user: option.Some(value))
    "AUTH_TYPE" -> Context(..ctx, auth_type: option.Some(value))
    "SCRIPT_NAME" -> Context(..ctx, script_name: option.Some(value))
    "SERVER_PROTOCOL" -> Context(..ctx, server_protocol: option.Some(value))
    "SERVER_SOFTWARE" -> Context(..ctx, server_software: option.Some(value))
    _ -> Context(..ctx, extra: dict.insert(ctx.extra, key, value))
  }
}

fn finalize_request(
  acc: PartialRequest,
  body: body,
) -> Result(#(Request(body), Context), RequestError) {
  use method_str <- result.try(option.to_result(acc.method, MissingMethod))
  use method <- result.try(
    http.parse_method(method_str)
    |> result.replace_error(InvalidMethod(method_str)),
  )
  let scheme = case acc.https {
    option.Some(value) -> https_scheme_from_value(value)
    option.None -> http.Http
  }
  let server_name = option.unwrap(acc.server_name, "localhost")
  let server_port =
    option.then(acc.server_port_raw, fn(raw) {
      int.parse(raw)
      |> option.from_result
    })
  let #(host, port) = case acc.http_host {
    option.Some(raw) -> resolve_http_host(raw, server_name, server_port)
    option.None -> #(server_name, server_port)
  }
  let path = option.unwrap(acc.path, "/")
  let req =
    request.Request(
      method:,
      headers: acc.headers,
      body:,
      scheme:,
      host:,
      port:,
      path:,
      query: acc.query,
    )
  Ok(#(req, acc.ctx))
}

fn https_scheme_from_value(value: String) -> http.Scheme {
  case string.lowercase(value) {
    "" | "off" -> http.Http
    _ -> http.Https
  }
}

fn resolve_http_host(
  raw: String,
  server_name: String,
  server_port: Option(Int),
) -> #(String, Option(Int)) {
  use <- bool.guard(when: raw == "", return: #(server_name, server_port))
  case string.starts_with(raw, "[") {
    True -> resolve_bracketed_host(raw, server_port)
    False -> resolve_unbracketed_host(raw, server_port)
  }
}

fn resolve_bracketed_host(
  raw: String,
  server_port: Option(Int),
) -> #(String, Option(Int)) {
  case string.split_once(string.drop_start(raw, 1), "]") {
    Ok(#(host, "")) if host != "" -> #(host, server_port)
    Ok(#(host, ":" <> port_str)) if host != "" -> {
      let port = int.parse(port_str) |> option.from_result
      #(host, option.or(port, server_port))
    }
    _ -> #(raw, server_port)
  }
}

fn resolve_unbracketed_host(
  raw: String,
  server_port: Option(Int),
) -> #(String, Option(Int)) {
  case string.split_once(raw, ":") {
    Ok(#(host, port_str)) if host != "" -> {
      let port =
        int.parse(port_str)
        |> option.from_result
      #(host, option.or(port, server_port))
    }
    _ -> #(raw, server_port)
  }
}

fn request_error_message(error: RequestError) -> String {
  case error {
    MissingMethod -> "missing REQUEST_METHOD parameter"
    InvalidMethod(method) -> "invalid REQUEST_METHOD: " <> method
  }
}

fn run_user_handler(
  handler_fn: Handler,
  req: Request(BodyReader),
  ctx: Context,
) -> Response(ResponseData) {
  case exception.rescue(fn() { handler_fn(req, ctx) }) {
    Ok(resp) -> resp
    Error(exception) -> {
      logging.log(
        logging.Error,
        "handler crashed: " <> string.inspect(exception),
      )
      error_response(500, "internal server error")
    }
  }
}

fn dispose_response(response: Response(ResponseData)) -> Nil {
  case response.body {
    File(handle:, ..) -> close_file(handle)
    Bytes(_) | Stream(_) -> Nil
  }
}

fn send_response(
  socket: Socket,
  request_id: Int,
  response: Response(ResponseData),
) -> Result(Nil, Nil) {
  let headers = responder.encode_response_headers(request_id, response)
  let end_records = responder.encode_response_end_records(request_id)
  case response.body {
    Bytes(data) -> {
      let body = responder.encode_stdout_chunk(request_id, data)
      let combined = bytes_tree.concat([headers, body, end_records])
      send_if_nonempty(socket, combined)
    }
    File(handle, offset, length) -> {
      use <- exception.defer(fn() { close_file(handle) })
      use _ <- result.try(send_tree(socket, headers))
      use _ <- result.try(send_via_sendfile(
        socket,
        request_id,
        handle,
        offset,
        length,
      ))

      send_tree(socket, end_records)
    }
    Stream(producer) -> {
      use _ <- result.try(send_tree(socket, headers))

      let sender = StreamSender(socket:, request_id:)
      case exception.rescue(fn() { producer(sender) }) {
        Ok(_) -> Nil
        Error(exception) ->
          logging.log(
            logging.Error,
            "stream producer crashed: " <> string.inspect(exception),
          )
      }

      send_tree(socket, end_records)
    }
  }
}

fn send_via_sendfile(
  socket: Socket,
  request_id: Int,
  handle: Handle,
  offset: Int,
  remaining: Int,
) -> Result(Nil, Nil) {
  use <- bool.guard(when: remaining <= 0, return: Ok(Nil))

  let chunk_size = int.min(remaining, protocol.max_record_content_size)
  let #(header, padding_length) =
    protocol.encode_stdout_record_header(request_id, chunk_size)
  use _ <- result.try(send_bits(socket, header))
  use _ <- result.try(drain_sendfile_loop(socket, handle, offset, chunk_size))
  use _ <- result.try(send_padding(socket, padding_length))
  send_via_sendfile(
    socket,
    request_id,
    handle,
    offset + chunk_size,
    remaining - chunk_size,
  )
}

fn drain_sendfile_loop(
  socket: Socket,
  handle: Handle,
  offset: Int,
  remaining: Int,
) -> Result(Nil, Nil) {
  use <- bool.guard(when: remaining <= 0, return: Ok(Nil))
  case sendfile(handle, socket, offset, remaining) {
    Error(_) -> Error(Nil)
    Ok(0) -> Error(Nil)
    Ok(sent) ->
      drain_sendfile_loop(socket, handle, offset + sent, remaining - sent)
  }
}

fn send_padding(socket: Socket, padding_length: Int) -> Result(Nil, Nil) {
  use <- bool.guard(when: padding_length == 0, return: Ok(Nil))
  send_bits(socket, <<0:size({ padding_length * 8 })>>)
}

fn send_if_nonempty(socket: Socket, bytes: BytesTree) -> Result(Nil, Nil) {
  case bytes_tree.byte_size(bytes) {
    0 -> Ok(Nil)
    _ -> send_tree(socket, bytes)
  }
}

type BodyContext {
  BodyContext(
    state: responder.State,
    pending: BitArray,
    finished: Bool,
    overflowed: Bool,
    connection: Connection,
    snapshot: process.Subject(BodySnapshot),
  )
}

type BodySnapshot {
  BodySnapshot(
    state: responder.State,
    finished: Bool,
    overflowed: Bool,
    aborted: Bool,
  )
}

fn replace_snapshot(
  mailbox: process.Subject(BodySnapshot),
  snap: BodySnapshot,
) -> Nil {
  let _ = process.receive(mailbox, 0)
  process.send(mailbox, snap)
}

fn build_body_reader(
  connection: Connection,
  state: responder.State,
  events: List(responder.Event),
) -> #(BodyReader, Option(process.Subject(BodySnapshot))) {
  let #(data, ended, overflowed) = collect_body_events(events)
  case ended || overflowed {
    True -> #(buffered_reader(data, overflowed), option.None)
    False -> {
      let snapshot = process.new_subject()
      process.send(
        snapshot,
        BodySnapshot(state:, finished: False, overflowed: False, aborted: False),
      )
      let ctx =
        BodyContext(
          state:,
          pending: data,
          finished: False,
          overflowed: False,
          connection:,
          snapshot:,
        )
      #(fn() { next_read(ctx) }, option.Some(snapshot))
    }
  }
}

fn collect_body_events(
  events: List(responder.Event),
) -> #(BitArray, Bool, Bool) {
  collect_body_events_loop(events, <<>>, False, False)
}

fn collect_body_events_loop(
  events: List(responder.Event),
  data: BitArray,
  ended: Bool,
  overflowed: Bool,
) -> #(BitArray, Bool, Bool) {
  case events {
    [] -> #(data, ended, overflowed)
    [responder.BodyChunk(chunk), ..rest] ->
      collect_body_events_loop(
        rest,
        <<data:bits, chunk:bits>>,
        ended,
        overflowed,
      )
    [responder.BodyEnd, ..rest] ->
      collect_body_events_loop(rest, data, True, overflowed)
    [responder.BodyTooLarge, ..rest] ->
      collect_body_events_loop(rest, data, True, True)
    [responder.RequestReady(_, _, _), ..rest] ->
      collect_body_events_loop(rest, data, ended, overflowed)
  }
}

fn buffered_reader(data: BitArray, overflowed: Bool) -> BodyReader {
  case overflowed, bit_array.byte_size(data) {
    True, _ -> fn() { Error(BodyTooLarge) }
    False, 0 -> fn() { Ok(End) }
    False, _ -> fn() { Ok(Chunk(data, fn() { Ok(End) })) }
  }
}

fn next_read(ctx: BodyContext) -> Result(Read, ReadError) {
  use <- bool.guard(when: ctx.overflowed, return: Error(BodyTooLarge))
  case bit_array.byte_size(ctx.pending), ctx.finished {
    0, True -> Ok(End)
    0, False -> pull_more(ctx)
    _, _ -> deliver_pending(ctx)
  }
}

fn deliver_pending(ctx: BodyContext) -> Result(Read, ReadError) {
  let next_ctx = BodyContext(..ctx, pending: <<>>)
  Ok(Chunk(data: ctx.pending, consume: fn() { next_read(next_ctx) }))
}

fn pull_more(ctx: BodyContext) -> Result(Read, ReadError) {
  let prior =
    BodySnapshot(
      state: ctx.state,
      finished: ctx.finished,
      overflowed: ctx.overflowed,
      aborted: False,
    )

  use more <- result.try(recv_connection(ctx.connection))
  let #(new_data, next) = step_body(prior, more, ctx.connection)
  replace_snapshot(ctx.snapshot, next)

  use <- bool.guard(when: next.aborted, return: Error(RequestAborted))
  next_read(
    BodyContext(
      ..ctx,
      state: next.state,
      pending: <<ctx.pending:bits, new_data:bits>>,
      finished: next.finished,
      overflowed: next.overflowed,
    ),
  )
}

fn step_body(
  prior: BodySnapshot,
  bytes: BitArray,
  connection: Connection,
) -> #(BitArray, BodySnapshot) {
  let outcome = step_and_flush(connection, prior.state, bytes)
  let #(data, ended, overflowed) = collect_body_events(outcome.events)
  let aborted = case outcome.next {
    responder.CloseConnection -> True
    responder.WaitForMore -> False
  }

  let snapshot =
    BodySnapshot(
      state: outcome.state,
      finished: prior.finished || ended,
      overflowed: prior.overflowed || overflowed,
      aborted: prior.aborted || aborted,
    )
  #(data, snapshot)
}

fn recv_connection(connection: Connection) -> Result(BitArray, ReadError) {
  case recv(connection.socket, 0, connection.body_read_timeout_ms) {
    Error(Posix(reason)) ->
      case atom.to_string(reason) {
        "timeout" -> Error(ReadTimeout)
        _ -> Error(ClientDisconnected)
      }
    Error(_) -> Error(ClientDisconnected)
    Ok(<<>>) -> Error(ClientDisconnected)
    Ok(more) -> Ok(more)
  }
}

fn drain_body_loop(
  snap: BodySnapshot,
  connection: Connection,
) -> Result(responder.State, Nil) {
  use <- bool.guard(when: snap.overflowed || snap.aborted, return: Error(Nil))
  use <- bool.guard(when: snap.finished, return: Ok(snap.state))

  case recv_connection(connection) {
    Error(_) -> Error(Nil)
    Ok(more) -> {
      let #(_data, next) = step_body(snap, more, connection)
      drain_body_loop(next, connection)
    }
  }
}

type Handle

type Socket

type SocketError {
  PathExists(path: String)
  InvalidHost(host: String)
  Posix(reason: Atom)
}

@external(erlang, "gen_tcp", "accept")
fn accept(listen: Socket) -> Result(Socket, Atom)

@external(erlang, "fcgi_ffi", "close_file")
fn close_file(handle: Handle) -> Nil

@external(erlang, "fcgi_ffi", "close_socket")
fn close_socket(socket: Socket) -> Nil

@external(erlang, "fcgi_ffi", "controlling_process")
fn controlling_process(
  socket: Socket,
  pid: process.Pid,
) -> Result(Nil, SocketError)

@external(erlang, "fcgi_ffi", "delete_path")
fn delete_path(path: String) -> Nil

@external(erlang, "fcgi_ffi", "listen_tcp")
fn do_listen_tcp(host: String, port: Int) -> Result(Socket, SocketError)

@external(erlang, "fcgi_ffi", "listen_unix")
fn do_listen_unix(path: String) -> Result(Socket, SocketError)

@external(erlang, "fcgi_ffi", "open_and_size")
fn open_and_size(path: String) -> Result(#(Handle, Int), FileError)

@external(erlang, "fcgi_ffi", "recv")
fn recv(
  socket: Socket,
  size: Int,
  timeout_ms: Int,
) -> Result(BitArray, SocketError)

@external(erlang, "fcgi_ffi", "send")
fn send_bits(socket: Socket, data: BitArray) -> Result(Nil, Nil)

@external(erlang, "fcgi_ffi", "send")
fn send_tree(socket: Socket, data: BytesTree) -> Result(Nil, Nil)

@external(erlang, "fcgi_ffi", "socket_port")
fn do_socket_port(socket: Socket) -> Result(Int, SocketError)

@external(erlang, "fcgi_ffi", "sendfile")
fn sendfile(
  handle: Handle,
  socket: Socket,
  offset: Int,
  bytes: Int,
) -> Result(Int, Nil)