lib/repo.ex

defmodule Fly.Repo do
  @moduledoc """
  This wraps the built-in `Ecto.Repo` functions to proxy writable functions like
  insert, update and delete to be performed on the an Elixir node in the primary
  region.

  To use it, rename your existing repo module and add a new module with the same
  name as your original repo like this.


  Original code:

  ```elixir
  defmodule MyApp.Repo do
    use Ecto.Repo,
      otp_app: :my_app,
      adapter: Ecto.Adapters.Postgres
  end
  ```

  Changes to:

  ```elixir
  defmodule MyApp.Repo.Local do
    use Ecto.Repo,
      otp_app: :my_app,
      adapter: Ecto.Adapters.Postgres

    # Dynamically configure the database url based for runtime environment.
    def init(_type, config) do
      {:ok, Keyword.put(config, :url, Fly.Postgres.database_url())}
    end
  end

  defmodule Core.Repo do
    use Fly.Repo, local_repo: MyApp.Repo.Local
  end
  ```

  Using the same name allows your existing code to seamlessly work with the new
  repo.

  When explicitly managing database transactions like using Multi or
  `start_transaction`, when used to modify data, those functions should be
  called by an RPC so they run in the primary region.

  ```elixir
  Fly.RPC.rpc_region(:primary, {MyModule, :my_function_that_uses_multi, [my,
  args]}, opts)
  ```
  """

  defmacro __using__(opts) do
    quote bind_quoted: [opts: opts] do
      @local_repo Keyword.fetch!(opts, :local_repo)
      @timeout Keyword.get(opts, :timeout, 5_000)
      @replication_timeout Keyword.get(opts, :replication_timeout, 5_000)

      # Here we are injecting as little as possible then calling out to the
      # library functions.

      @doc """
      See `Ecto.Repo.config/0` for full documentation.
      """
      @spec config() :: Keyword.t()
      def config do
        @local_repo.config()
      end

      @doc """
      Calculate the given `aggregate`.

      See `Ecto.Repo.aggregate/3` for full documentation.
      """
      def aggregate(queryable, aggregate, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:aggregate, [queryable, aggregate, opts])
      end

      @doc """
      Calculate the given `aggregate` over the given `field`.

      See `Ecto.Repo.aggregate/4` for full documentation.
      """
      def aggregate(queryable, aggregate, field, opts) do
        unquote(__MODULE__).__exec_local__(:aggregate, [queryable, aggregate, field, opts])
      end

      @doc """
      Fetches all entries from the data store matching the given query.

      See `Ecto.Repo.all/2` for full documentation.
      """
      def all(queryable, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:all, [queryable, opts])
      end

      @doc """
      Deletes a struct using its primary key.

      See `Ecto.Repo.delete/2` for full documentation.
      """
      def delete(struct_or_changeset, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:delete, [struct_or_changeset, opts], opts)
      end

      @doc """
      Same as `delete/2` but returns the struct or raises if the changeset is invalid.

      See `Ecto.Repo.delete!/2` for full documentation.
      """
      def delete!(struct_or_changeset, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:delete!, [struct_or_changeset, opts], opts)
      end

      @doc """
      Deletes all entries matching the given query.

      See `Ecto.Repo.delete_all/2` for full documentation.
      """
      def delete_all(queryable, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:delete_all, [queryable, opts], opts)
      end

      @doc """
      Checks if there exists an entry that matches the given query.

      See `Ecto.Repo.exists?/2` for full documentation.
      """
      def exists?(queryable, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:exists?, [queryable, opts])
      end

      @doc """
      Fetches a single struct from the data store where the primary key matches the given id.

      See `Ecto.Repo.get/3` for full documentation.
      """
      def get(queryable, id, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:get, [queryable, id, opts])
      end

      @doc """
      Similar to `get/3` but raises `Ecto.NoResultsError` if no record was found.

      See `Ecto.Repo.get!/3` for full documentation.
      """
      def get!(queryable, id, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:get!, [queryable, id, opts])
      end

      @doc """
      Fetches a single result from the query.

      See `Ecto.Repo.get_by/3` for full documentation.
      """
      def get_by(queryable, clauses, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:get_by, [queryable, clauses, opts])
      end

      @doc """
      Similar to `get_by/3` but raises `Ecto.NoResultsError` if no record was found.

      See `Ecto.Repo.get_by!/3` for full documentation.
      """
      def get_by!(queryable, clauses, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:get_by!, [queryable, clauses, opts])
      end

      @doc """
      Returns the atom name or pid of the current repository.

      See `Ecto.Repo.get_dynamic_repo/0` for full documentation.
      """
      @spec get_dynamic_repo() :: Keyword.t()
      def get_dynamic_repo do
        @local_repo.get_dynamic_repo()
      end

      @doc """
      Inserts a struct defined via Ecto.Schema or a changeset.

      See `Ecto.Repo.insert/2` for full documentation.
      """
      def insert(struct_or_changeset, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:insert, [struct_or_changeset, opts], opts)
      end

      @doc """
      Same as `insert/2` but returns the struct or raises if the changeset is invalid.

      See `Ecto.Repo.insert!/2` for full documentation.
      """
      def insert!(struct_or_changeset, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:insert!, [struct_or_changeset, opts], opts)
      end

      @doc """
      Inserts all entries into the repository.

      See `Ecto.Repo.insert_all/3` for full documentation.
      """
      def insert_all(schema_or_source, entries_or_query, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(
          :insert_all,
          [
            schema_or_source,
            entries_or_query,
            opts
          ],
          opts
        )
      end

      @doc """
      Inserts or updates a changeset depending on whether the struct is persisted or not

      See `Ecto.Repo.insert_or_update/2` for full documentation.
      """
      def insert_or_update(changeset, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:insert_or_update, [changeset, opts], opts)
      end

      @doc """
      Same as `insert_or_update!/2` but returns the struct or raises if the changeset is invalid.

      See `Ecto.Repo.insert_or_update!/2` for full documentation.
      """
      def insert_or_update!(changeset, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:insert_or_update!, [changeset, opts], opts)
      end

      @doc """
      Fetches a single result from the query.

      See `Ecto.Repo.one/2` for full documentation.
      """
      def one(queryable, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:one, [queryable, opts])
      end

      @doc """
      Similar to a `one/2` but raises Ecto.NoResultsError if no record was found.

      See `Ecto.Repo.one!/2` for full documentation.
      """
      def one!(queryable, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:one!, [queryable, opts])
      end

      @doc """
      Preloads all associations on the given struct or structs.

      See `Ecto.Repo.preload/3` for full documentation.
      """
      def preload(structs_or_struct_or_nil, preloads, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:preload, [
          structs_or_struct_or_nil,
          preloads,
          opts
        ])
      end

      @doc """
      A user customizable callback invoked for query-based operations.

      See `Ecto.Repo.preload/3` for full documentation.
      """
      def prepare_query(operation, query, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:prepare_query, [operation, query, opts])
      end

      @doc """
      Sets the dynamic repository to be used in further interactions.

      See `Ecto.Repo.put_dynamic_repo/1` for full documentation.
      """
      def put_dynamic_repo(name_or_pid) do
        unquote(__MODULE__).__exec_local__(:put_dynamic_repo, [name_or_pid])
      end

      @doc """
      The same as `query`, but raises on invalid queries.

      See `Ecto.Adapters.SQL.query/4` for full documentation.
      """
      def query(query, params \\ [], opts \\ []) do
        unquote(__MODULE__).__exec_local__(:query, [query, params, opts])
      end

      @doc """
      Run a custom SQL query.

      See `Ecto.Adapters.SQL.query!/4` for full documentation.
      """
      def query!(query, params \\ [], opts \\ []) do
        unquote(__MODULE__).__exec_local__(:query!, [query, params, opts])
      end

      @doc """
      Reloads a given schema or schema list from the database.

      See `Ecto.Repo.reload/2` for full documentation.
      """
      def reload(struct_or_structs, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:reload, [struct_or_structs, opts])
      end

      @doc """
      Similar to `reload/2`, but raises when something is not found.

      See `Ecto.Repo.reload!/2` for full documentation.
      """
      def reload!(struct_or_structs, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:reload!, [struct_or_structs, opts])
      end

      @doc """
      Rolls back the current transaction.

      Defaults to the local database repo. When managing transactions explicitly,
      they are always done on the local repository. If data modifications will be made,
      then it is suggested to use `Fly.Postgres.rpc_and_wait/4` to perform the
      operation in the primary region.

      See `Ecto.Repo.rollback/1` for full documentation.
      """
      def rollback(value) do
        unquote(__MODULE__).__exec_local__(:rollback, [value])
      end

      @doc """
      Returns a lazy enumerable that emits all entries from the data store matching the given query.

      See `Ecto.Repo.stream/2` for full documentation.
      """
      def stream(queryable, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:stream, [queryable, opts])
      end

      @doc """
      Runs the given function or Ecto.Multi inside a transaction.

      Defaults to the local database repo. When managing transactions explicitly,
      they are always done on the local repository. If data modifications will be made,
      then it is suggested to use `Fly.Postgres.rpc_and_wait/4` to perform the
      operation in the primary region.

      See `Ecto.Repo.transaction/2` for full documentation.
      """
      def transaction(fun_or_multi, opts \\ []) do
        unquote(__MODULE__).__exec_local__(:transaction, [fun_or_multi, opts])
      end

      @doc """
      Updates a changeset using its primary key.

      See `Ecto.Repo.update/2` for full documentation.
      """
      def update(changeset, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:update, [changeset, opts], opts)
      end

      @doc """
      Same as `update/2` but returns the struct or raises if the changeset is invalid.

      See `Ecto.Repo.update!/2` for full documentation.
      """
      def update!(changeset, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:update!, [changeset, opts], opts)
      end

      @doc """
      Updates all entries matching the given query with the given values.

      See `Ecto.Repo.update_all/3` for full documentation.
      """
      def update_all(queryable, updates, opts \\ []) do
        unquote(__MODULE__).__exec_on_primary__(:update_all, [queryable, updates, opts], opts)
      end

      def __exec_local__(func, args) do
        apply(@local_repo, func, args)
      end

      def __exec_on_primary__(func, args, opts) do
        # Default behavior is to wait for replication. If `:await` is set to
        # false/falsey then skip the LSN query and waiting for replication.
        if Keyword.get(opts, :await, true) do
          rpc_timeout = Keyword.get(opts, :rpc_timeout, @timeout)
          replication_timeout = Keyword.get(opts, :replication_timeout, @replication_timeout)

          Fly.Postgres.rpc_and_wait(@local_repo, func, args,
            rpc_timeout: rpc_timeout,
            replication_timeout: replication_timeout,
            tracker: Keyword.get(opts, :tracker)
          )
        else
          Fly.RPC.rpc_primary({@local_repo, func, args}, timeout: @timeout)
        end
      end
    end
  end
end