lib/exqlite/connection.ex

defmodule Exqlite.Connection do
  @moduledoc """
  This module imlements connection details as defined in DBProtocol.

  ## Attributes

  - `db` - The sqlite3 database reference.
  - `path` - The path that was used to open.
  - `transaction_status` - The status of the connection. Can be `:idle` or `:transaction`.

  ## Unknowns

  - How are pooled connections going to work? Since sqlite3 doesn't allow for
    simultaneous access. We would need to check if the write ahead log is
    enabled on the database. We can't assume and set the WAL pragma because the
    database may be stored on a network volume which would cause potential
    issues.

  Notes:
    - we try to closely follow structure and naming convention of myxql.
    - sqlite thrives when there are many small conventions, so we may not implement
      some strategies employed by other adapters. See https://sqlite.org/np1queryprob.html
  """

  use DBConnection
  alias Exqlite.Error
  alias Exqlite.Pragma
  alias Exqlite.Query
  alias Exqlite.Result
  alias Exqlite.Sqlite3

  defstruct [
    :db,
    :path,
    :transaction_status,
    :status,
    :chunk_size
  ]

  @type t() :: %__MODULE__{
          db: Sqlite3.db(),
          path: String.t(),
          transaction_status: :idle | :transaction,
          status: :idle | :busy
        }

  @impl true
  @doc """
  Initializes the Ecto Exqlite adapter.

  For connection configurations we use the defaults that come with SQLite3, but
  we recommend which options to choose. We do not default to the recommended
  because we don't know what your environment is like.

  Allowed options:

    * `:database` - The path to the database. In memory is allowed. You can use
      `:memory` or `":memory:"` to designate that.
    * `:journal_mode` - Sets the journal mode for the sqlite connection. Can be
      one of the following `:delete`, `:truncate`, `:persist`, `:memory`,
      `:wal`, or `:off`. Defaults to `:delete`. It is recommended that you use
      `:wal` due to support for concurrent reads. Note: `:wal` does not mean
      concurrent writes.
    * `:temp_store` - Sets the storage used for temporary tables. Default is
      `:default`. Allowed values are `:default`, `:file`, `:memory`. It is
      recommended that you use `:memory` for storage.
    * `:synchronous` - Can be `:extra`, `:full`, `:normal`, or `:off`. Defaults
      to `:normal`.
    * `:foreign_keys` - Sets if foreign key checks should be enforced or not.
      Can be `:on` or `:off`. Default is `:on`.
    * `:cache_size` - Sets the cache size to be used for the connection. This is
      an odd setting as a positive value is the number of pages in memory to use
      and a negative value is the size in kilobytes to use. Default is `-2000`.
      It is recommended that you use `-64000`.
    * `:cache_spill` - The cache_spill pragma enables or disables the ability of
      the pager to spill dirty cache pages to the database file in the middle of
      a transaction. By default it is `:on`, and for most applications, it
      should remain so.
    * `:case_sensitive_like`
    * `:auto_vacuum` - Defaults to `:none`. Can be `:none`, `:full` or
      `:incremental`. Depending on the database size, `:incremental` may be
      beneficial.
    * `:locking_mode` - Defaults to `:normal`. Allowed values are `:normal` or
      `:exclusive`. See [sqlite documenation][1] for more information.
    * `:secure_delete` - Defaults to `:off`. If enabled, it will cause SQLite3
      to overwrite records that were deleted with zeros.
    * `:wal_auto_check_point` - Sets the write-ahead log auto-checkpoint
      interval. Default is `1000`. Setting the auto-checkpoint size to zero or a
      negative value turns auto-checkpointing off.
    * `:busy_timeout` - Sets the busy timeout in milliseconds for a connection.
      Default is `2000`.
    * `:chunk_size` - The chunk size for bulk fetching. Defaults to `50`.

  For more information about the options above, see [sqlite documenation][1]

  [1]: https://www.sqlite.org/pragma.html
  """
  def connect(options) do
    database = Keyword.get(options, :database)

    options =
      Keyword.put_new(
        options,
        :chunk_size,
        Application.get_env(:exqlite, :default_chunk_size, 50)
      )

    case database do
      nil ->
        {:error,
         %Error{
           message: """
           You must provide a :database to the database. \
           Example: connect(database: "./") or connect(database: :memory)\
           """
         }}

      :memory ->
        do_connect(":memory:", options)

      _ ->
        do_connect(database, options)
    end
  end

  @impl true
  def disconnect(_err, %__MODULE__{db: db}) do
    with :ok <- Sqlite3.close(db) do
      :ok
    else
      {:error, reason} -> {:error, %Error{message: reason}}
    end
  end

  @impl true
  def checkout(%__MODULE__{status: :idle} = state) do
    {:ok, %{state | status: :busy}}
  end

  def checkout(%__MODULE__{status: :busy} = state) do
    {:disconnect, %Error{message: "Database is busy"}, state}
  end

  @impl true
  def ping(state), do: {:ok, state}

  ##
  ## Handlers
  ##

  @impl true
  def handle_prepare(%Query{} = query, options, state) do
    with {:ok, query} <- prepare(query, options, state) do
      {:ok, query, state}
    end
  end

  @impl true
  def handle_execute(%Query{} = query, params, options, state) do
    with {:ok, query} <- prepare(query, options, state) do
      execute(:execute, query, params, state)
    end
  end

  @doc """
  Begin a transaction.

  For full info refer to sqlite docs: https://sqlite.org/lang_transaction.html

  Note: default transaction mode is DEFERRED.
  """
  @impl true
  def handle_begin(options, %{transaction_status: transaction_status} = state) do
    # TODO: This doesn't handle more than 2 levels of transactions.
    #
    # One possible solution would be to just track the number of open
    # transactions and use that for driving the transaction status being idle or
    # in a transaction.
    #
    # I do not know why the other official adapters do not track this and just
    # append level on the savepoint. Instead the rollbacks would just completely
    # revert the issues when it may be desirable to fix something while in the
    # transaction and then commit.
    case Keyword.get(options, :mode, :deferred) do
      :deferred when transaction_status == :idle ->
        handle_transaction(:begin, "BEGIN TRANSACTION", state)

      :transaction when transaction_status == :idle ->
        handle_transaction(:begin, "BEGIN TRANSACTION", state)

      :immediate when transaction_status == :idle ->
        handle_transaction(:begin, "BEGIN IMMEDIATE TRANSACTION", state)

      :exclusive when transaction_status == :idle ->
        handle_transaction(:begin, "BEGIN EXCLUSIVE TRANSACTION", state)

      mode
      when mode in [:deferred, :immediate, :exclusive, :savepoint] and
             transaction_status == :transaction ->
        handle_transaction(:begin, "SAVEPOINT exqlite_savepoint", state)
    end
  end

  @impl true
  def handle_commit(options, %{transaction_status: transaction_status} = state) do
    case Keyword.get(options, :mode, :deferred) do
      :savepoint when transaction_status == :transaction ->
        handle_transaction(
          :commit_savepoint,
          "RELEASE SAVEPOINT exqlite_savepoint",
          state
        )

      mode
      when mode in [:deferred, :immediate, :exclusive, :transaction] and
             transaction_status == :transaction ->
        handle_transaction(:commit, "COMMIT", state)
    end
  end

  @impl true
  def handle_rollback(options, %{transaction_status: transaction_status} = state) do
    case Keyword.get(options, :mode, :deferred) do
      :savepoint when transaction_status == :transaction ->
        with {:ok, _result, state} <-
               handle_transaction(
                 :rollback_savepoint,
                 "ROLLBACK TO SAVEPOINT exqlite_savepoint",
                 state
               ) do
          handle_transaction(
            :rollback_savepoint,
            "RELEASE SAVEPOINT exqlite_savepoint",
            state
          )
        end

      mode
      when mode in [:deferred, :immediate, :exclusive, :transaction] ->
        handle_transaction(:rollback, "ROLLBACK TRANSACTION", state)
    end
  end

  @doc """
  Close a query prepared by `c:handle_prepare/3` with the database. Return
  `{:ok, result, state}` on success and to continue,
  `{:error, exception, state}` to return an error and continue, or
  `{:disconnect, exception, state}` to return an error and disconnect.

  This callback is called in the client process.
  """
  @impl true
  def handle_close(query, _opts, state) do
    Sqlite3.release(state.db, query.ref)
    {:ok, nil, state}
  end

  @impl true
  def handle_declare(%Query{} = query, params, opts, state) do
    # We emulate cursor functionality by just using a prepared statement and
    # step through it. Thus we just return the query ref as the cursor.
    with {:ok, query} <- prepare_no_cache(query, opts, state),
         {:ok, query} <- bind_params(query, params, state) do
      {:ok, query, query.ref, state}
    end
  end

  @impl true
  def handle_deallocate(%Query{} = query, _cursor, _opts, state) do
    Sqlite3.release(state.db, query.ref)
    {:ok, nil, state}
  end

  @impl true
  def handle_fetch(%Query{statement: statement}, cursor, _opts, state) do
    case Sqlite3.step(state.db, cursor) do
      :done ->
        {
          :halt,
          %Result{
            rows: [],
            command: :fetch,
            num_rows: 0
          },
          state
        }

      {:row, row} ->
        {
          :cont,
          %Result{
            rows: [row],
            command: :fetch,
            num_rows: 1
          },
          state
        }

      :busy ->
        {:error, %Error{message: "Database busy", statement: statement}, state}

      {:error, reason} ->
        {:error, %Error{message: reason, statement: statement}, state}
    end
  end

  @impl true
  def handle_status(_opts, state) do
    {state.transaction_status, state}
  end

  ### ----------------------------------
  #     Internal functions and helpers
  ### ----------------------------------

  defp set_pragma(db, pragma_name, value) do
    Sqlite3.execute(db, "PRAGMA #{pragma_name} = #{value}")
  end

  defp get_pragma(db, pragma_name) do
    {:ok, statement} = Sqlite3.prepare(db, "PRAGMA #{pragma_name}")

    case Sqlite3.fetch_all(db, statement) do
      {:ok, [[value]]} -> {:ok, value}
      _ -> :error
    end
  end

  defp maybe_set_pragma(db, pragma_name, value) do
    case get_pragma(db, pragma_name) do
      {:ok, current} ->
        if current == value do
          :ok
        else
          set_pragma(db, pragma_name, value)
        end

      _ ->
        set_pragma(db, pragma_name, value)
    end
  end

  defp set_journal_mode(db, options) do
    maybe_set_pragma(db, "journal_mode", Pragma.journal_mode(options))
  end

  defp set_temp_store(db, options) do
    set_pragma(db, "temp_store", Pragma.temp_store(options))
  end

  defp set_synchronous(db, options) do
    set_pragma(db, "synchronous", Pragma.synchronous(options))
  end

  defp set_foreign_keys(db, options) do
    set_pragma(db, "foreign_keys", Pragma.foreign_keys(options))
  end

  defp set_cache_size(db, options) do
    maybe_set_pragma(db, "cache_size", Pragma.cache_size(options))
  end

  defp set_cache_spill(db, options) do
    set_pragma(db, "cache_spill", Pragma.cache_spill(options))
  end

  defp set_case_sensitive_like(db, options) do
    set_pragma(db, "case_sensitive_like", Pragma.case_sensitive_like(options))
  end

  defp set_auto_vacuum(db, options) do
    set_pragma(db, "auto_vacuum", Pragma.auto_vacuum(options))
  end

  defp set_locking_mode(db, options) do
    set_pragma(db, "locking_mode", Pragma.locking_mode(options))
  end

  defp set_secure_delete(db, options) do
    set_pragma(db, "secure_delete", Pragma.secure_delete(options))
  end

  defp set_wal_auto_check_point(db, options) do
    set_pragma(db, "wal_autocheckpoint", Pragma.wal_auto_check_point(options))
  end

  defp set_busy_timeout(db, options) do
    set_pragma(db, "busy_timeout", Pragma.busy_timeout(options))
  end

  defp do_connect(path, options) do
    with :ok <- mkdir_p(path),
         {:ok, db} <- Sqlite3.open(path),
         :ok <- set_journal_mode(db, options),
         :ok <- set_temp_store(db, options),
         :ok <- set_synchronous(db, options),
         :ok <- set_foreign_keys(db, options),
         :ok <- set_cache_size(db, options),
         :ok <- set_cache_spill(db, options),
         :ok <- set_auto_vacuum(db, options),
         :ok <- set_locking_mode(db, options),
         :ok <- set_secure_delete(db, options),
         :ok <- set_wal_auto_check_point(db, options),
         :ok <- set_case_sensitive_like(db, options),
         :ok <- set_busy_timeout(db, options) do
      state = %__MODULE__{
        db: db,
        path: path,
        transaction_status: :idle,
        status: :idle,
        chunk_size: Keyword.get(options, :chunk_size)
      }

      {:ok, state}
    else
      {:error, reason} ->
        {:error, %Exqlite.Error{message: reason}}
    end
  end

  def maybe_put_command(query, options) do
    case Keyword.get(options, :command) do
      nil -> query
      command -> %{query | command: command}
    end
  end

  # Attempt to retrieve the cached query, if it doesn't exist, we'll prepare one
  # and cache it for later.
  defp prepare(%Query{statement: statement} = query, options, state) do
    query = maybe_put_command(query, options)

    with {:ok, ref} <- Sqlite3.prepare(state.db, IO.iodata_to_binary(statement)),
         query <- %{query | ref: ref} do
      {:ok, query}
    else
      {:error, reason} ->
        {:error, %Error{message: reason, statement: statement}, state}
    end
  end

  # Prepare a query and do not cache it.
  defp prepare_no_cache(%Query{statement: statement} = query, options, state) do
    query = maybe_put_command(query, options)

    case Sqlite3.prepare(state.db, statement) do
      {:ok, ref} ->
        {:ok, %{query | ref: ref}}

      {:error, reason} ->
        {:error, %Error{message: reason, statement: statement}, state}
    end
  end

  @spec maybe_changes(Sqlite3.db(), Query.t()) :: integer() | nil
  defp maybe_changes(db, %Query{command: command})
       when command in [:update, :insert, :delete] do
    case Sqlite3.changes(db) do
      {:ok, total} -> total
      _ -> nil
    end
  end

  defp maybe_changes(_, _), do: nil

  # when we have an empty list of columns, that signifies that
  # there was no possible return tuple (e.g., update statement without RETURNING)
  # and in that case, we return nil to signify no possible result.
  defp maybe_rows([], []), do: nil
  defp maybe_rows(rows, _cols), do: rows

  defp execute(call, %Query{} = query, params, state) do
    with {:ok, query} <- bind_params(query, params, state),
         {:ok, columns} <- get_columns(query, state),
         {:ok, rows} <- get_rows(query, state),
         {:ok, transaction_status} <- Sqlite3.transaction_status(state.db),
         changes <- maybe_changes(state.db, query) do
      case query.command do
        command when command in [:delete, :insert, :update] ->
          {
            :ok,
            query,
            Result.new(
              command: call,
              num_rows: changes,
              rows: maybe_rows(rows, columns)
            ),
            %{state | transaction_status: transaction_status}
          }

        _ ->
          {
            :ok,
            query,
            Result.new(
              command: call,
              columns: columns,
              rows: rows,
              num_rows: Enum.count(rows)
            ),
            %{state | transaction_status: transaction_status}
          }
      end
    end
  end

  defp bind_params(%Query{ref: ref, statement: statement} = query, params, state)
       when ref != nil do
    case Sqlite3.bind(state.db, ref, params) do
      :ok ->
        {:ok, query}

      {:error, reason} ->
        {:error, %Error{message: reason, statement: statement}, state}
    end
  end

  defp get_columns(%Query{ref: ref, statement: statement}, state) do
    case Sqlite3.columns(state.db, ref) do
      {:ok, columns} ->
        {:ok, columns}

      {:error, reason} ->
        {:error, %Error{message: reason, statement: statement}, state}
    end
  end

  defp get_rows(%Query{ref: ref, statement: statement}, state) do
    case Sqlite3.fetch_all(state.db, ref, state.chunk_size) do
      {:ok, rows} ->
        {:ok, rows}

      {:error, reason} ->
        {:error, %Error{message: reason, statement: statement}, state}
    end
  end

  defp handle_transaction(call, statement, state) do
    with :ok <- Sqlite3.execute(state.db, statement),
         {:ok, transaction_status} <- Sqlite3.transaction_status(state.db) do
      result = %Result{
        command: call,
        rows: [],
        columns: [],
        num_rows: 0
      }

      {:ok, result, %{state | transaction_status: transaction_status}}
    else
      {:error, reason} ->
        {:disconnect, %Error{message: reason, statement: statement}, state}
    end
  end

  # SQLITE_OPEN_CREATE will create the DB file if not existing, but
  # will not create intermediary directories if they are missing.
  # So let's preemptively create the intermediate directories here
  # before trying to open the DB file.
  defp mkdir_p(":memory:"), do: :ok
  defp mkdir_p(path), do: File.mkdir_p(Path.dirname(path))
end