lib/tds/protocol.ex

defmodule Tds.Protocol do
  @moduledoc """
  Implements DBConnection behaviour for TDS protocol.
  """
  alias Tds.{Parameter, Query}
  import Tds.{BinaryUtils, Messages, Utils}
  require Logger
  use DBConnection

  @timeout 5_000
  @sock_opts [packet: :raw, mode: :binary, active: false]
  @trans_levels [
    :read_uncommitted,
    :read_committed,
    :repeatable_read,
    :snapshot,
    :serializable
  ]

  @type sock :: {:gen_tcp | :ssl, :gen_tcp.socket() | :ssl.sslsocket()}
  @type env :: %{
          trans: <<_::8>>,
          savepoint: non_neg_integer,
          collation: Tds.Protocol.Collation.t(),
          packetsize: integer
        }
  @type transaction :: nil | :started | :successful | :failed
  @type state ::
          :ready
          | :prelogin
          | :login
          | :prepare
          | :executing
  @type packet_data :: binary

  @type t :: %__MODULE__{
          sock: nil | sock,
          usock: nil | pid,
          itcp: term,
          opts: nil | Keyword.t(),
          state: state,
          result: nil | list(),
          query: nil | String.t(),
          transaction: transaction,
          env: env
        }

  defstruct sock: nil,
            usock: nil,
            itcp: nil,
            opts: nil,
            # Tells if connection is ready or executing command
            state: :ready,
            result: nil,
            query: nil,
            transaction: nil,
            env: %{
              trans: <<0x00>>,
              savepoint: 0,
              collation: %Tds.Protocol.Collation{},
              packetsize: 4096
            }

  @spec connect(opts :: Keyword.t()) :: {:ok, state :: t()} | {:error, Exception.t()}
  def connect(opts) do
    opts =
      opts
      |> Keyword.put_new(:username, System.get_env("MSSQLUSER") || System.get_env("USER"))
      |> Keyword.put_new(:password, System.get_env("MSSQLPASSWORD"))
      |> Keyword.put_new(:instance, System.get_env("MSSQLINSTANCE"))
      |> Keyword.put_new(:hostname, System.get_env("MSSQLHOST") || "localhost")
      |> Enum.reject(fn {_k, v} -> is_nil(v) end)

    s = %__MODULE__{}

    case opts[:instance] do
      nil ->
        connect(opts, s)

      _instance ->
        case instance(opts, s) do
          {:ok, s} -> connect(opts, s)
          err -> {:error, err}
        end
    end
  end

  @spec disconnect(err :: Exception.t() | String.t(), state :: t()) :: :ok
  def disconnect(_err, %{sock: {mod, sock}} = s) do
    # If socket is active we flush any socket messages so the next
    # socket does not get the messages.
    _ = flush(s)
    mod.close(sock)
  end

  @spec ping(t) :: {:ok, t} | {:disconnect, Exception.t(), t}
  def ping(state) do
    case send_query(~s{SELECT 'pong' as [msg]}, state) do
      {:ok, _, s} ->
        {:ok, s}

      {:disconnect, :closed, s} ->
        {:disconnect, %Tds.Error{message: "Connection closed."}, s}

      {:error, err, s} ->
        err =
          if Exception.exception?(err) do
            err
          else
            Tds.Error.exception(inspect(err))
          end

        {:disconnect, err, s}

      any ->
        {:disconnect, Tds.Error.exception(inspect(any)), state}
    end
  end

  @spec checkout(state :: t) ::
          {:ok, new_state :: any} | {:disconnect, Exception.t(), new_state :: t}
  def checkout(%{transaction: :started} = s) do
    err = %Tds.Error{message: "Unexpected transaction status `:started`"}
    {:disconnect, err, s}
  end

  def checkout(%{sock: {mod, _sock}} = s) do
    sock_mod = inspect(mod)

    case setopts(s.sock, active: false) do
      :ok ->
        {:ok, s}

      {:error, reason} ->
        msg = "Failed to #{sock_mod}.setops(active: false) due `#{reason}`"
        {:disconnect, %Tds.Error{message: msg}, s}
    end
  end

  @spec checkin(state :: t) ::
          {:ok, new_state :: t} | {:disconnect, Exception.t(), new_state :: t}
  def checkin(%{transaction: :started} = s) do
    err = %Tds.Error{message: "Unexpected transaction status `:started`"}
    {:disconnect, err, s}
  end

  def checkin(%{sock: {mod, _sock}} = s) do
    sock_mod = inspect(mod)

    case setopts(s.sock, active: :once) do
      :ok ->
        {:ok, s}

      {:error, reason} ->
        msg = "Failed to #{sock_mod}.setops(active: false) due `#{reason}`"
        {:disconnect, %Tds.Error{message: msg}, s}
    end
  end

  @spec handle_execute(Tds.Query.t(), DBConnection.params(), Keyword.t(), t) ::
          {:ok, Tds.Query.t(), Tds.Result.t(), new_state :: t}
          | {:error | :disconnect, Exception.t(), new_state :: t}
  def handle_execute(
        %Query{handle: handle, statement: statement} = query,
        params,
        opts,
        %{sock: _sock} = s
      ) do
    params = opts[:parameters] || params
    Process.put(:resultset, Keyword.get(opts, :resultset, false))

    if params != [] do
      send_param_query(query, params, s)
    else
      send_query(statement, s)
    end
  rescue
    exception ->
      {:error, exception, s}
  else
    {:ok, result, state} ->
      {:ok, query, result, state}

    other ->
      other
  after
    Process.delete(:resultset)

    unless is_nil(handle) do
      handle_close(query, opts, %{s | state: :executing})
    end
  end

  @spec handle_prepare(Tds.Query.t(), Keyword.t(), t) ::
          {:ok, Tds.Query.t(), new_state :: t()}
          | {:error | :disconnect, Exception.t(), new_state :: t}
  def handle_prepare(%{statement: statement} = query, opts, s) do
    case Keyword.get(opts, :execution_mode, :prepare_execute) do
      :prepare_execute ->
        params =
          opts[:parameters]
          |> Parameter.prepared_params()

        send_prepare(statement, params, %{s | state: :prepare})

      :executesql ->
        {:ok, query, %{s | state: :executing}}

      execution_mode ->
        message =
          "Unknown execution mode #{inspect(execution_mode)}, please check your config." <>
            "Supported modes are :prepare_execute and :executesql"

        {:error, %Tds.Error{message: message}, s}
    end
  end

  @spec handle_close(Tds.Query.t(), nil | keyword | map, t()) ::
          {:ok, Tds.Result.t(), new_state :: t()}
          | {:error | :disconnect, Exception.t(), new_state :: t()}
  def handle_close(query, opts, s) do
    params = opts[:parameters]
    send_close(query, params, s)
  end

  @spec handle_begin(Keyword.t(), t) ::
          {:ok, Tds.Result.t(), new_state :: t}
          | {DBConnection.status(), new_state :: t}
          | {:disconnect, Exception.t(), new_state :: t}
  def handle_begin(opts, %{env: env, transaction: tran} = s) do
    isolation_level = Keyword.get(opts, :isolation_level, :read_committed)

    case Keyword.get(opts, :mode, :transaction) do
      :transaction when tran == nil ->
        payload = [isolation_level: isolation_level]
        send_transaction("TM_BEGIN_XACT", payload, %{s | transaction: :started})

      :savepoint when tran in [:started, :failed] ->
        savepoint = env.savepoint + 1
        env = %{env | savepoint: savepoint}
        s = %{s | transaction: :started, env: env}
        send_transaction("TM_SAVE_XACT", [name: savepoint], s)

      mode when mode in [:transaction, :savepoint] ->
        handle_status(opts, s)
    end
  end

  @spec handle_commit(Keyword.t(), t) ::
          {:ok, Tds.Result.t(), new_state :: t}
          | {DBConnection.status(), new_state :: t}
          | {:disconnect, Exception.t(), new_state :: t}
  def handle_commit(opts, %{transaction: transaction, env: env} = s) do
    case Keyword.get(opts, :mode, :transaction) do
      :transaction when transaction == :started ->
        send_transaction("TM_COMMIT_XACT", [], %{s | transaction: nil})

      :savepoint when transaction == :started ->
        send_transaction("TM_SAVE_XACT", [name: env.savepoint], s)

      mode when mode in [:transaction, :savepoint] ->
        handle_status(opts, s)
    end
  end

  @spec handle_rollback(Keyword.t(), t) ::
          {:ok, Tds.Result.t(), new_state :: t}
          | {:idle, new_state :: t}
          | {:disconnect, Exception.t(), new_state :: t}
  def handle_rollback(opts, %{env: env, transaction: transaction} = s) do
    case Keyword.get(opts, :mode, :transaction) do
      :transaction when transaction in [:started, :failed] ->
        env = %{env | savepoint: 0}
        s = %{s | transaction: nil, env: env}
        payload = [name: 0, isolation_level: :read_committed]
        send_transaction("TM_ROLLBACK_XACT", payload, s)

      :savepoint when transaction in [:started, :failed] ->
        payload = [name: env.savepoint]

        send_transaction("TM_ROLLBACK_XACT", payload, %{
          s
          | transaction: :started
        })

      mode when mode in [:transaction, :savepoint] ->
        handle_status(opts, s)
    end
  end

  @spec handle_status(Keyword.t(), t) ::
          {:idle | :transaction | :error, t}
          | {:disconnect, Exception.t(), t}
  def handle_status(_, %{transaction: transaction} = state) do
    case transaction do
      nil -> {:idle, state}
      :successful -> {:idle, state}
      :started -> {:transaction, state}
      :failed -> {:error, state}
    end
  end

  @spec handle_fetch(
          Query.t(),
          cursor :: any(),
          opts :: Keyword.t(),
          state :: t()
        ) ::
          {:cont | :halt, Tds.Result.t(), new_state :: t()}
          | {:error | :disconnect, Exception.t(), new_state :: t()}
  def handle_fetch(_query, _cursor, _opts, state) do
    {:error, Tds.Error.exception("Cursor is not supported by TDS"), state}
  end

  @spec handle_deallocate(
          query :: Query.t(),
          cursor :: any,
          opts :: Keyword.t(),
          state :: t()
        ) ::
          {:ok, Tds.Result.t(), new_state :: t()}
          | {:error | :disconnect, Exception.t(), new_state :: t()}
  def handle_deallocate(_query, _cursor, _opts, state) do
    {:error, Tds.Error.exception("Cursor operations are not supported in TDS"), state}
  end

  @spec handle_declare(Query.t(), params :: any, opts :: Keyword.t(), state :: t) ::
          {:ok, Query.t(), cursor :: any, new_state :: t}
          | {:error | :disconnect, Exception.t(), new_state :: t}
  def handle_declare(_query, _params, _opts, state) do
    {:error, Tds.Error.exception("Cursor operations are not supported in TDS"), state}
  end

  # CONNECTION

  defp instance(opts, s) do
    host = Keyword.fetch!(opts, :hostname)
    host = if is_binary(host), do: String.to_charlist(host), else: host

    case :gen_udp.open(0, [:binary, {:active, false}, {:reuseaddr, true}]) do
      {:ok, sock} ->
        :gen_udp.send(sock, host, 1434, <<3>>)
        {:ok, msg} = :gen_udp.recv(sock, 0)
        parse_udp(msg, %{s | opts: opts, usock: sock})

      {:error, error} ->
        {:error, %Tds.Error{message: "udp connect: #{error}"}}
    end
  end

  defp connect(opts, s) do
    host = Keyword.fetch!(opts, :hostname)
    host = if is_binary(host), do: String.to_charlist(host), else: host

    port = s.itcp || opts[:port] || System.get_env("MSSQLPORT") || 1433
    {port, _} = if is_binary(port), do: Integer.parse(port), else: {port, nil}

    timeout = opts[:timeout] || @timeout

    sock_opts = @sock_opts ++ (opts[:socket_options] || [])

    s = %{s | opts: opts}

    # Initalize TCP connection with the SQL Server
    with {:ok, sock} <- :gen_tcp.connect(host, port, sock_opts, timeout),
         {:ok, buffers} <- :inet.getopts(sock, [:sndbuf, :recbuf, :buffer]),
         :ok <- :inet.setopts(sock, buffer: max_buf_size(buffers)) do
      # Send Prelogin message to SQL Server
      case send_prelogin(%{s | sock: {:gen_tcp, sock}}) do
        {:error, error, _state} ->
          :gen_tcp.close(sock)
          {:error, error}

        other ->
          other
      end
    else
      {:error, error} ->
        {:error, %Tds.Error{message: "tcp connect: #{error}"}}
    end
  end

  defp parse_udp(
         {_, 1434, <<_head::binary-3, data::binary>>},
         %{opts: opts, usock: sock} = s
       ) do
    :gen_udp.close(sock)

    server =
      data
      |> String.split(";;")
      |> Enum.slice(0..-2)
      |> Enum.reduce([], fn str, acc ->
        server =
          str
          |> String.split(";")
          |> Enum.chunk_every(2)
          |> Enum.reduce([], fn [k, v], acc ->
            k =
              k
              |> String.downcase()
              |> String.to_atom()

            Keyword.put_new(acc, k, v)
          end)

        [server | acc]
      end)
      |> Enum.find(fn s ->
        String.downcase(s[:instancename]) == String.downcase(opts[:instance])
      end)

    case server do
      nil ->
        {:error, %Tds.Error{message: "Instance #{opts[:instance]} not found"}}

      serv ->
        {port, _} = Integer.parse(serv[:tcp])
        {:ok, %{s | opts: opts, itcp: port, usock: nil}}
    end
  end

  defp ssl_connect(%{sock: {:gen_tcp, sock}, opts: opts} = s) do
    {:ok, _} = Application.ensure_all_started(:ssl)

    case Tds.Tls.connect(sock, opts[:ssl_opts] || []) do
      {:ok, ssl_sock} ->
        state = %{s | sock: {:ssl, ssl_sock}}
        {:ok, state}

      {:error, reason} ->
        error =
          Tds.Error.exception(
            "Unable to establish secure connection to server due #{inspect(reason)}"
          )

        :gen_tcp.close(sock)
        {:error, error, s}
    end
  end

  def handle_info({:udp_error, _, :econnreset}, s) do
    msg =
      "Tds encountered an error while connecting to the Sql Server " <>
        "Browser: econnreset"

    {:stop, Tds.Error.exception(msg), s}
  end

  def handle_info(
        {:tcp, _, _data},
        %{sock: {mod, sock}, opts: opts, state: :prelogin} = s
      ) do
    setopts(s.sock, active: false)

    login(%{s | opts: opts, sock: {mod, sock}})
  end

  def handle_info({tag, _}, s) when tag in [:tcp_closed, :ssl_closed] do
    {:stop, Tds.Error.exception("tcp closed"), s}
  end

  def handle_info({tag, _, reason}, s) when tag in [:tcp_error, :ssl_error] do
    {:stop, Tds.Error.exception("tcp error: #{reason}"), s}
  end

  def handle_info(msg, s) do
    Logger.error(fn ->
      "Unhandled message! \n\n" <>
        "  Tds.Protocol.hand_info/2 \n\n" <>
        "    Arg #1 \n" <>
        inspect(msg) <>
        "    Arg #2 \n" <>
        inspect(s)
    end)

    {:ok, s}
  end

  defp decode(packet_data, %{state: state} = s) do
    {msg, s} = parse(state, packet_data, s)

    case message(state, msg, s) do
      {:ok, s} ->
        # message processed, reset header and msg buffer, then process
        # tail
        {:ok, s}

      {:ok, _result, s} ->
        # send_query returns a result
        {:ok, s}

      {:error, _, _} = err ->
        err
    end
  end

  defp flush(%{sock: sock} = s) do
    receive do
      {:tcp, ^sock, data} ->
        _ = decode(data, s)
        {:ok, s}

      {:tcp_closed, ^sock} ->
        {:disconnect, %Tds.Error{message: "tcp closed"}, s}

      {:tcp_error, ^sock, reason} ->
        {:disconnect, %Tds.Error{message: "tcp error: #{reason}"}, s}
    after
      0 ->
        # There might not be any socket messages.
        {:ok, s}
    end
  end

  # PROTOCOL

  def send_prelogin(%{opts: opts} = s) do
    msg = msg_prelogin(params: opts)

    case msg_send(msg, %{s | state: :prelogin}) do
      {:ok, s} -> login(s)
      any -> any
    end
  end

  def login(%{opts: opts} = s) do
    msg = msg_login(params: opts)

    case login_send(msg, %{s | state: :login}) do
      {:ok, s} ->
        {:ok, %{s | state: :ready}}

      err ->
        err
    end
  end

  defp send_query(statement, s) do
    msg = msg_sql(query: statement)

    case msg_send(msg, %{s | state: :executing}) do
      {:ok, %{result: result} = s} ->
        {:ok, result, s}

      {:error, err, %{transaction: :started} = s} ->
        {:error, err, %{s | transaction: :failed}}

      err ->
        err
    end
  end

  def send_prepare(statement, params, s) do
    params = [
      %Tds.Parameter{
        name: "@handle",
        type: :integer,
        direction: :output,
        value: nil
      },
      %Tds.Parameter{name: "@params", type: :string, value: params},
      %Tds.Parameter{name: "@stmt", type: :string, value: statement}
    ]

    msg = msg_rpc(proc: :sp_prepare, query: statement, params: params)

    case msg_send(msg, %{s | state: :prepare}) do
      {:ok, %{query: query} = s} ->
        {:ok, %{query | statement: statement}, %{s | state: :executing}}

      {:error, err, %{transaction: :started} = s} ->
        {:error, err, %{s | transaction: :failed}}

      err ->
        err
    end
  end

  def send_transaction(command, payload, s) do
    msg =
      msg_transmgr(
        command: command,
        name: Keyword.get(payload, :name),
        isolation_level: Keyword.get(payload, :isolation_level)
      )

    case msg_send(msg, %{s | state: :transaction_manager}) do
      {:ok, %{result: result} = s} ->
        {:ok, result, s}

      {:error, err} ->
        {:disconnect, err, s}

      {:error, err, s} ->
        {:disconnect, err, s}
    end
  end

  @spec send_param_query(Tds.Query.t(), list(), t) ::
          {:error, any()}
          | {:ok, %{optional(:result) => none()}}
          | {:disconnect, any(), %{env: any(), sock: {any(), any()}}}
          | {:error, Tds.Error.t(), %{pak_header: <<>>, tail: <<>>}}
          | {:ok, any(), %{result: any(), state: :ready}}
  defp send_param_query(
         %Query{handle: handle, statement: statement} = _,
         params,
         %{transaction: :started} = s
       ) do
    msg =
      case handle do
        nil ->
          p = [
            %Parameter{
              name: "@statement",
              type: :string,
              direction: :input,
              value: statement
            },
            %Parameter{
              name: "@params",
              type: :string,
              direction: :input,
              value: Parameter.prepared_params(params)
            }
            | Parameter.prepare_params(params)
          ]

          msg_rpc(proc: :sp_executesql, params: p)

        handle ->
          p = [
            %Parameter{
              name: "@handle",
              type: :integer,
              direction: :input,
              value: handle
            }
            | Parameter.prepare_params(params)
          ]

          msg_rpc(proc: :sp_execute, params: p)
      end

    case msg_send(msg, s) do
      {:ok, %{result: result} = s} ->
        {:ok, result, %{s | state: :ready}}

      {:error, err, %{transaction: :started} = s} ->
        {:error, err, %{s | transaction: :failed}}

      err ->
        err
    end
  end

  defp send_param_query(
         %Query{handle: handle, statement: statement} = _,
         params,
         s
       ) do
    msg =
      case handle do
        nil ->
          p = [
            %Parameter{
              name: "@statement",
              type: :string,
              direction: :input,
              value: statement
            },
            %Parameter{
              name: "@params",
              type: :string,
              direction: :input,
              value: Parameter.prepared_params(params)
            }
            | Parameter.prepare_params(params)
          ]

          msg_rpc(proc: :sp_executesql, params: p)

        handle ->
          p = [
            %Parameter{
              name: "@handle",
              type: :integer,
              direction: :input,
              value: handle
            }
            | Parameter.prepare_params(params)
          ]

          msg_rpc(proc: :sp_execute, params: p)
      end

    case msg_send(msg, s) do
      {:ok, %{result: result} = s} ->
        {:ok, result, %{s | state: :ready}}

      {:error, err, %{transaction: :started} = s} ->
        {:error, err, %{s | transaction: :failed}}

      err ->
        err
    end
  end

  def send_close(%Query{handle: handle} = _query, _params, s) do
    params = [
      %Tds.Parameter{
        name: "@handle",
        type: :integer,
        direction: :input,
        value: handle
      }
    ]

    msg = msg_rpc(proc: :sp_unprepare, params: params)

    case msg_send(msg, s) do
      {:ok, %{result: result} = s} ->
        {:ok, result, %{s | state: :ready}}

      {:error, err, %{transaction: :started} = s} ->
        {:error, err, %{s | transaction: :failed}}

      err ->
        err
    end
  end

  def message(:prelogin, msg_preloginack(response: response), _) do
    case response do
      {:login, s} -> {:ok, s}
      {:encrypt, s} -> ssl_connect(s)
      other -> other
    end
  end

  def message(
        :login,
        msg_loginack(redirect: %{hostname: host, port: port}),
        %{opts: opts}
      ) do
    opts
    |> Keyword.put(:hostname, host)
    |> Keyword.put(:port, port)
    |> connect()
  end

  def message(:login, msg_loginack(), %{opts: opts} = s) do
    state = %{s | opts: clean_opts(opts)}

    opts
    |> conn_opts()
    |> IO.iodata_to_binary()
    |> send_query(state)
  end

  def message(:executing, msg_result(set: set), s) do
    resultset? = Process.get(:resultset, false)

    result =
      case {set, resultset?} do
        {r, true} -> r
        {r, false} -> List.first(r) || %Tds.Result{rows: nil}
        {[h | _t], _false} -> h
      end

    {:ok, mark_ready(%{s | result: result})}
  end

  def message(:transaction_manager, msg_trans(), s) do
    result = %Tds.Result{columns: [], rows: [], num_rows: 0}

    {:ok, %{s | state: :ready, result: result}}
  end

  def message(:prepare, msg_prepared(params: params), %{} = s) do
    handle =
      params
      |> Enum.find(%{}, &(&1.name == "@handle" and &1.direction == :output))
      |> Map.get(:value, nil)

    result = %Tds.Result{columns: [], rows: [], num_rows: 0}
    query = %Tds.Query{handle: handle}

    {:ok, mark_ready(%{s | result: result, query: query})}
  end

  ## Error
  def message(_, msg_error(error: e), %{} = s) do
    error = %Tds.Error{mssql: e}
    {:error, error, mark_ready(s)}
  end

  ## ATTN Ack
  def message(:attn, _, %{} = s) do
    result = %Tds.Result{columns: [], rows: [], num_rows: 0}

    {:ok, %{s | statement: "", state: :ready, result: result}}
  end

  defp mark_ready(%{state: _} = s) do
    %{s | state: :ready}
  end

  # Send Command To Sql Server
  defp login_send(msg, %{sock: {mod, sock}, env: env, opts: opts} = s) do
    paks = encode_msg(msg, env)

    Enum.each(paks, fn pak ->
      mod.send(sock, pak)
    end)

    case msg_recv(s) do
      {:disconnect, ex, s} ->
        {:disconnect, ex, %{s | opts: clean_opts(opts)}}

      buffer ->
        buffer
        |> IO.iodata_to_binary()
        |> decode(%{s | state: :login})
    end
  end

  defp msg_send(
         msg,
         %{sock: {mod, port}, env: env, opts: opts} = s
       ) do
    setopts(s.sock, active: false)

    opts
    |> Keyword.get(:use_elixir_calendar_types, false)
    |> use_elixir_calendar_types()

    send_result =
      msg
      |> encode_msg(env)
      |> Enum.reduce_while(:ok, fn chunk, _ ->
        case mod.send(port, chunk) do
          {:error, reason} -> {:halt, {:error, reason}}
          :ok -> {:cont, :ok}
        end
      end)

    with :ok <- send_result,
         buffer when is_list(buffer) <- msg_recv(s) do
      buffer
      |> IO.iodata_to_binary()
      |> decode(s)
    else
      {:disconnect, _ex, _s} = res -> {0, res}
      other -> other
    end
  end

  defp msg_recv(%{sock: {mod, pid}} = s) do
    case mod.recv(pid, 0) do
      {:ok, pkg} ->
        pkg
        |> next_tds_pkg([])
        |> msg_recv(s)

      {:error, error} ->
        {:disconnect,
         %Tds.Error{
           message: "Connection failed to receive packet due #{inspect(error)}"
         }, s}
    end
  catch
    {:error, error} -> {:disconnect, error, s}
  end

  defp msg_recv({:done, buffer, _}, _s) do
    Enum.reverse(buffer)
  end

  defp msg_recv({:more, buffer, more, last?}, %{sock: {mod, pid}} = s) do
    take = if last?, do: more, else: 0

    case mod.recv(pid, take) do
      {:ok, pkg} ->
        next_tds_pkg(pkg, buffer, more, last?)
        |> msg_recv(s)

      {:error, error} ->
        throw({:error, error})
    end
  end

  defp msg_recv({:more, buffer, unknown_pkg}, %{sock: {mod, pid}} = s) do
    case mod.recv(pid, 0) do
      {:ok, pkg} ->
        unknown_pkg
        |> Kernel.<>(pkg)
        |> next_tds_pkg(buffer)
        |> msg_recv(s)

      {:error, error} ->
        throw({:error, error})
    end
  end

  defp next_tds_pkg(pkg, buffer) do
    case pkg do
      <<0x04, 0x01, size::int16(), _::int32(), chunk::binary>> ->
        more = size - 8
        next_tds_pkg(chunk, buffer, more, true)

      <<0x04, 0x00, size::int16(), _::int32(), chunk::binary>> ->
        more = size - 8
        next_tds_pkg(chunk, buffer, more, false)

      unknown_pkg ->
        {:more, buffer, unknown_pkg}
    end
  end

  defp next_tds_pkg(pkg, buffer, more, true) do
    case pkg do
      <<chunk::binary(more, 8), tail::binary>> ->
        {:done, [chunk | buffer], tail}

      <<chunk::binary>> ->
        more = more - byte_size(chunk)
        {:more, [chunk | buffer], more, true}
    end
  end

  defp next_tds_pkg(pkg, buffer, more, false) do
    case pkg do
      <<chunk::binary(more, 8), tail::binary>> ->
        next_tds_pkg(tail, [chunk | buffer])

      <<chunk::binary>> ->
        more = more - byte_size(chunk)
        {:more, [chunk | buffer], more, false}
    end
  end

  defp clean_opts(opts) do
    Keyword.put(opts, :password, :REDACTED)
  end

  @spec conn_opts(Keyword.t()) :: list() | no_return
  defp conn_opts(opts) do
    [
      "SET ANSI_NULLS ON; ",
      "SET QUOTED_IDENTIFIER ON; ",
      "SET CURSOR_CLOSE_ON_COMMIT OFF; ",
      "SET ANSI_NULL_DFLT_ON ON; ",
      "SET ANSI_PADDING ON; ",
      "SET ANSI_WARNINGS ON; ",
      "SET CONCAT_NULL_YIELDS_NULL ON; ",
      "SET TEXTSIZE 2147483647; "
    ]
    |> append_opts(opts, :set_language)
    |> append_opts(opts, :set_datefirst)
    |> append_opts(opts, :set_dateformat)
    |> append_opts(opts, :set_deadlock_priority)
    |> append_opts(opts, :set_lock_timeout)
    |> append_opts(opts, :set_remote_proc_transactions)
    |> append_opts(opts, :set_implicit_transactions)
    |> append_opts(opts, :set_transaction_isolation_level)
    |> append_opts(opts, :set_allow_snapshot_isolation)
  end

  defp append_opts(conn, opts, :set_language) do
    case Keyword.get(opts, :set_language) do
      nil -> conn
      val -> conn ++ ["SET LANGUAGE #{val}; "]
    end
  end

  defp append_opts(conn, opts, :set_datefirst) do
    case Keyword.get(opts, :set_datefirst) do
      nil ->
        conn

      val when val in 1..7 ->
        conn ++ ["SET DATEFIRST #{val}; "]

      val ->
        raise(
          ArgumentError,
          "set_datefirst: #{inspect(val)} is out of bounds, valid range is 1..7"
        )
    end
  end

  defp append_opts(conn, opts, :set_dateformat) do
    case Keyword.get(opts, :set_dateformat) do
      nil ->
        conn

      val when val in [:mdy, :dmy, :ymd, :ydm, :myd, :dym] ->
        conn ++ ["SET DATEFORMAT #{val}; "]

      val ->
        raise(
          ArgumentError,
          "set_dateformat: #{inspect(val)} is an invalid value, " <>
            "valid values are [:mdy, :dmy, :ymd, :ydm, :myd, :dym]"
        )
    end
  end

  defp append_opts(conn, opts, :set_deadlock_priority) do
    case Keyword.get(opts, :set_deadlock_priority) do
      nil ->
        conn

      val when val in [:low, :high, :normal] ->
        conn ++ ["SET DEADLOCK_PRIORITY #{val}; "]

      nil ->
        conn

      val when val in -10..10 ->
        conn ++ ["SET DEADLOCK_PRIORITY #{val}; "]

      val ->
        raise(
          ArgumentError,
          "set_deadlock_priority: #{inspect(val)} is an invalid value, " <>
            "valid values are #{inspect([:low, :high, :normal] ++ [-10..10])}"
        )
    end
  end

  defp append_opts(conn, opts, :set_lock_timeout) do
    case Keyword.get(opts, :set_lock_timeout) do
      nil ->
        conn

      val when val > 0 ->
        conn ++ ["SET LOCK_TIMEOUT #{val}; "]

      val ->
        raise(
          ArgumentError,
          "set_lock_timeout: #{inspect(val)} is an invalid value, " <>
            "must be an positive integer."
        )
    end
  end

  defp append_opts(conn, opts, :set_remote_proc_transactions) do
    case Keyword.get(opts, :set_remote_proc_transactions) do
      nil ->
        conn

      val when val in [:on, :off] ->
        val = val |> Atom.to_string() |> String.upcase()
        conn ++ ["SET REMOTE_PROC_TRANSACTIONS #{val}; "]

      val ->
        raise(
          ArgumentError,
          "set_remote_proc_transactions: #{inspect(val)} is an invalid value, " <>
            "should be either :on, :off, nil"
        )
    end
  end

  defp append_opts(conn, opts, :set_implicit_transactions) do
    case Keyword.get(opts, :set_implicit_transactions) do
      nil ->
        conn ++ ["SET IMPLICIT_TRANSACTIONS OFF; "]

      val when val in [:on, :off] ->
        val = val |> Atom.to_string() |> String.upcase()
        conn ++ ["SET IMPLICIT_TRANSACTIONS #{val}; "]

      val ->
        raise(
          ArgumentError,
          "set_implicit_transactions: #{inspect(val)} is an invalid value, " <>
            "should be either :on, :off, nil"
        )
    end
  end

  defp append_opts(conn, opts, :set_transaction_isolation_level) do
    case Keyword.get(opts, :set_transaction_isolation_level) do
      nil ->
        conn

      val when val in @trans_levels ->
        t =
          val
          |> Atom.to_string()
          |> String.replace("_", " ")
          |> String.upcase()

        conn ++ ["SET TRANSACTION ISOLATION LEVEL #{t}; "]

      val ->
        raise(
          ArgumentError,
          "set_transaction_isolation_level: #{inspect(val)} is an invalid value, " <>
            "should be one of #{inspect(@trans_levels)} or nil"
        )
    end
  end

  defp append_opts(conn, opts, :set_allow_snapshot_isolation) do
    database = Keyword.get(opts, :database)

    case Keyword.get(opts, :set_allow_snapshot_isolation) do
      nil ->
        conn

      val when val in [:on, :off] ->
        val = val |> Atom.to_string() |> String.upcase()

        conn ++
          ["ALTER DATABASE [#{database}] SET ALLOW_SNAPSHOT_ISOLATION #{val}; "]

      val ->
        raise(
          ArgumentError,
          "set_allow_snapshot_isolation: #{inspect(val)} is an invalid value, " <>
            "should be either :on, :off, nil"
        )
    end
  end

  defp setopts({mod, sock}, options) do
    case mod do
      :gen_tcp -> :inet.setopts(sock, options)
      :ssl -> :ssl.setopts(sock, options)
    end
  end

  defp max_buf_size(buffers) when is_list(buffers) do
    buffers
    |> Keyword.values()
    |> Enum.max()
  end
end