lib/exandra.ex

defmodule Exandra do
  @moduledoc """
  Adapter module for [Apache Cassandra](https://cassandra.apache.org/_/index.html)
  and [ScyllaDB](https://www.scylladb.com/).

  Uses [`Xandra`](https://github.com/lexhide/xandra) for communication with the
  underlying database.

  Uses [`Ecto`](https://github.com/elixir-ecto/ecto) for database interfacing,
  running schema migrations, and querying operations.

  ## Configuration

  To configure an `Ecto.Repo` that uses `Exandra` as its adapter, you can use
  the application configuration or pass the options when starting the repo.

  You can use the following options:

    * Any of the options supported by `Ecto.Repo` itself, which you can see
      in the `Ecto.Repo` documentation.

    * Any of the options supported by `Xandra.Cluster.start_link/1`.

  #{Exandra.Connection.start_opts_docs()}

  To configure your Ecto repository to use this adapter, you can use the
  `:adapter` option. For example, when defining the repo:

      defmodule MyApp.Repo do
        use Ecto.Repo, otp_app: :my_app, adapter: Exandra
      end

  You can configure your database connection in `config/dev.exs`. Here's an example `dev configuration`:

      # Configure your database
      config :my_app, MyApp.Repo,
        migration_primary_key: [name: :id, type: :binary_id], # Overrides the default type `bigserial` used for version attribute in schema migration
        contact_points: ["127.0.0.1"],  # List of database connection endpoints
        keyspace: "my_app_dev", # Name of your keyspace
        port: 9042,                     # Default port
        sync_connect: 5000,             # Waiting time in milliseconds for the database connection
        log: :info,
        stacktrace: true,
        show_sensitive_data_on_connection_error: true,
        pool_size: 10

  **Note:** The `bigserial` data type is specific to PostgreSQL databases and is not present in Scylla/Cassandra.


  ## Schemas

  You can regularly use `Ecto.Schema` with Exandra. For example:

      defmodule User do
        use Ecto.Schema

        @primary_key {:id, :binary_id, autogenerate: true}
        schema "users" do
          field :email, :string
          field :meta, Exandra.Map, key: :string, value: :string
        end
      end

  You can use all the usual types (`:string`, `Ecto.UUID`, and so on).

  ### Maps

  The `:map` type gets stored in Cassandra/Scylla as a blob of text with the map encoded as
  JSON. For example, if you have a schema with

      field :features, :map

  you can pass the field as an Elixir map when setting it, and Exandra will convert it to a map
  on the way from the database. Because Exandra uses JSON for this, you'll have to pay attention
  to things such as atom keys (which can be used when writing, but will be strings when reading)
  and such.

  ### User-Defined Types (UDTs)

  If one of your fields is a UDT, you can use the `Exandra.UDT` type for it. For example, if you
  have a `phone_number` UDT, you can declare fields with that type as:

      field :home_phone, Exandra.UDT, type: :phone_number
      field :office_phone, Exandra.UDT, type: :phone_number

  > #### String Keys {: .warning}
  >
  > There is no validation with `Exandra.UDT` and the keys _must_ be strings.

  Alternatively, you can use the `Exandra.EmbeddedType` for `Ecto.Schema`-backed UDTs. For example, if you have a
  `phone_number` UDT, you can use:

      field :home_phone, Exandra.EmbeddedType, using: MyApp.PhoneSchema

  Finally, if you have a column of frozen UDTs `list<frozen<phone_number>>`, you can still use the
  `Exandra.EmbeddedType` just as before with `cardinality: :many` like so:

      field :home_phone, Exandra.EmbeddedType, cardinality: :many, using: MyApp.PhoneSchema

  ### Arrays

  You can use arrays with the Ecto `{:array, <type>}` type. This gets translated to the
  `list<_>` native Cassandra/Scylla type. For example, you can declare a field as

      field :checkins, {:array, :utc_datetime}

  This field will use the native type `list<timestamp>`.

  > #### Exandra Types {: .tip}
  >
  > If you want to use actual Cassandra/Scylla types such as `map<_, _>` or
  > `set<_>`, you can use the corresponding Exandra types `Exandra.Map` and `Exandra.Set`.

  ### Counter Tables

  You can use the `Exandra.Counter` type to create counter fields (in counter tables). For
  example:

      @primary_key false
      schema "page_views" do
        field :route, :string, primary_key: true
        field :total, Exandra.Counter
      end

  You can only *update* counter fields. You'll have to use `c:Ecto.Repo.update_all/2`
  to insert or update counters. For example, in the table above, you'd update the
  `:total` counter field with:

      query =
        from page_view in "page_views",
          where: page_view.route == "/browse",
          update: [set: [total: 1]]

      MyApp.Repo.update_all(query)

  ## Batch Queries

  You can run **batch queries** through Exandra. Batch queries are supported by
  Cassandra/Scylla, and allow you to run multiple queries in a single request.
  See `Exandra.Batch` for more information and examples.

  ## Multiple keyspaces using prefixes

  You can use [query prefixes](https://hexdocs.pm/ecto/multi-tenancy-with-query-prefixes.html) to
  query different keyspaces using the same schemas. As pointed out in the Ecto docs,
  migrations must be run for *each* prefix in this case.

  ## Migrations

  You can use Exandra to run migrations as well, as it supports most of the DDL-related
  commands from `Ecto.Migration`. For example:

      defmodule AddUsers do
        use Ecto.Migration

        def change do
          create table("users", primary_key: false) do
            add :email, :string, primary_key: true
            add :age, :int
          end
        end
      end

  > #### Cassandra and Scylla Types {: .info}
  >
  > When writing migrations, remember that you must use the **actual types** from Cassandra or
  > Scylla, which you must pass in as an *atom*.
  >
  > For example, to add a column with the type of
  > a map of integer keys to boolean values, you need to declare its type as
  > `:"map<int, boolean>"`.

  This is a non-comprehensive list of types you can use:

    * `:"map<key_type, value_type>"` - maps (such as `:"map<int, boolean>"`).
    * `:"list<type>"` - lists (such as `:"list<uuid>"`).
    * `:string` - gets translated to the `text` type.
    * `:map` - maps get stored as text, and Exandra dumps and loads them automatically.
    * `<udt>` - User-Defined Types (UDTs) should be specified as their name, expressed as an
      atom. For example, a UDT called `full_name` would be specified as the type `:full_name`.
    * `:naive_datetime`, `:naive_datetime_usec`, `:utc_datetime`, `:utc_datetime_usec` -
      these are all represented as the `timestamp` type.

  ### User-Defined Types (UDTs)

  `Ecto.Migration` doesn't support creating, altering, or dropping Cassandra/Scylla **UDTs**.
  To do those operations in a migration, use `Ecto.Migration.execute/1`
  or `Ecto.Migration.execute/2`. For example, in your migration module:

      def change do
        execute(
          _up_query = "CREATE TYPE full_name (first_name text, last_name text))",
          _down_query = "DROP TYPE full_name"
        )
      end

  """

  # This overrides the checkout/3 function defined in Ecto.Adapters.SQL. That function
  # is a private API, and is defined by "use Ecto.Adapters.SQL". This is why this function
  # needs to appear BEFORE the call to "use Ecto.Adapters.SQL", so that we essentially
  # override the call. We'll work with the Ecto team to make that callback overridable.
  @doc false
  @spec checkout(map(), keyword(), (pid() -> result)) :: result when result: var
  def checkout(%{sql: conn_mod, pid: cluster} = meta, opts, fun) when is_function(fun, 1) do
    conn_mod.checkout(cluster, fun, Keyword.merge(opts, meta.opts))
  end

  use Ecto.Adapters.SQL, driver: :exandra

  @behaviour Ecto.Adapter.Storage

  @xandra_mod Application.compile_env(:exandra, :xandra_module, Xandra)

  @xandra_cluster_mod Application.compile_env(
                        :exandra,
                        :xandra_cluster_module,
                        Xandra.Cluster
                      )

  @doc false
  defmacro embedded_type(field, embedded_schema, opts \\ []) do
    quote do
      opts = Keyword.merge([using: unquote(embedded_schema)], unquote(opts))
      field unquote(field), Exandra.EmbeddedType, opts
    end
  end

  @doc """
  Executes a **batch query**.

  See `Exandra.Batch`.

  ## Examples

      queries = [
        {"INSERT INTO users (email) VALUES (?)", ["jeff@example.com"]},
        {"INSERT INTO users (email) VALUES (?)", ["britta@example.com"]}
      ]

      Exandra.execute_batch(MyApp.Repo, %Exandra.Batch{queries: queries})

  """
  @spec execute_batch(Ecto.Repo.t(), Exandra.Batch.t(), keyword()) ::
          :ok | {:error, Exception.t()}
  def execute_batch(repo, %Exandra.Batch{queries: queries} = _batch, options \\ [])
      when is_atom(repo) and is_list(options) do
    {prepare_options, execute_options} =
      Exandra.Connection.split_prepare_and_execute_options(options)

    fun = fn conn ->
      try do
        # First, prepare all queries (doesn't matter the order).
        prepared_queries =
          queries
          |> Enum.uniq_by(fn {sql, _values} -> sql end)
          |> Enum.reduce(%{}, fn {sql, _values}, acc ->
            case @xandra_mod.prepare(conn, sql, prepare_options) do
              {:ok, %Xandra.Prepared{} = prepared} -> Map.put(acc, sql, prepared)
              {:error, reason} -> throw({:prepare_error, reason})
            end
          end)

        # Now, add them all to a Xandra.Batch and execute the batch.
        batch =
          Enum.reduce(queries, Xandra.Batch.new(), fn {sql, values}, batch ->
            Xandra.Batch.add(batch, Map.fetch!(prepared_queries, sql), values)
          end)

        case @xandra_mod.execute(conn, batch, execute_options) do
          {:ok, %Xandra.Void{}} -> :ok
          {:error, reason} -> {:errror, reason}
        end
      catch
        {:prepare_error, reason} -> {:error, reason}
      end
    end

    repo.checkout(fun, options)
  end

  @doc false
  def autogenerate(Ecto.UUID), do: Ecto.UUID.generate()
  def autogenerate(:binary_id), do: Ecto.UUID.bingenerate()
  def autogenerate(type), do: super(type)

  @doc false
  @impl Ecto.Adapter
  def dumpers(:binary_id, type), do: [type, Ecto.UUID]
  def dumpers(:map, _type), do: [&encode_json/1]
  def dumpers(:naive_datetime, _type), do: [&naive_datetime_to_datetime/1]
  def dumpers(_val, type), do: [type]

  @doc false
  @impl Ecto.Adapter
  def loaders(:binary_id, _type), do: []
  def loaders(:exandra_map, type), do: [&Ecto.Type.embedded_load(type, &1, :exandra_map), type]
  def loaders(:exandra_set, type), do: [&Ecto.Type.embedded_load(type, &1, :exandra_set), type]

  def loaders({:map, _}, type),
    do: [&decode_json/1, &Ecto.Type.embedded_load(type, &1, :map), type]

  def loaders(:map, type), do: [&decode_json/1, type]
  # Xandra returns UUIDs as strings, so we don't need to do any loading.
  def loaders(:uuid, _type), do: []
  def loaders(:decimal, type), do: [&decimal_decode/1, type]
  def loaders(_, type), do: [type]

  defp decimal_decode({coefficient, exponent}) do
    sign = if coefficient < 0, do: -1, else: 1
    {:ok, Decimal.new(sign, abs(coefficient), -exponent)}
  end

  defp decimal_decode(nil), do: {:ok, nil}

  defp decode_json(data) when is_binary(data), do: {:ok, json_library().decode!(data)}
  defp decode_json(data), do: {:ok, data}

  defp encode_json(data) do
    {:ok, json_library().encode!(data)}
  end

  defp json_library(), do: Application.get_env(:exandra, :json_library, Jason)

  defp naive_datetime_to_datetime(%NaiveDateTime{} = datetime) do
    case DateTime.from_naive(datetime, "Etc/UTC") do
      {:ok, datetime} -> {:ok, datetime}
      {:ambiguous, _first, _second} -> {:error, :ambiguous}
      {:gap, _, _} -> {:error, :gap}
      {:error, reason} -> {:error, reason}
    end
  end

  @impl Ecto.Adapter.Migration
  def lock_for_migrations(_, _, fun), do: fun.()

  @impl Ecto.Adapter.Storage
  def storage_up(opts) do
    {keyspace, conn} = start_storage_connection(opts)

    # https://university.scylladb.com/courses/scylla-essentials-overview/lessons/high-availability/topic/fault-tolerance-replication-factor/
    stmt = """
    CREATE KEYSPACE IF NOT EXISTS #{keyspace}
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}
    AND durable_writes = true;
    """

    storage_toggle(conn, stmt, "CREATED", :already_up)
  end

  @impl Ecto.Adapter.Storage
  def storage_down(opts) do
    {keyspace, conn} = start_storage_connection(opts)

    storage_toggle(conn, "DROP KEYSPACE IF EXISTS #{keyspace}", "DROPPED", :already_down)
  end

  defp storage_toggle(conn, stmt, effect, error_msg) do
    case @xandra_mod.execute(conn, stmt) do
      {:ok, %Xandra.SchemaChange{effect: ^effect}} ->
        :ok

      {:ok, %Xandra.Void{}} ->
        {:error, error_msg}

      err ->
        err
    end
  end

  @impl Ecto.Adapter.Storage
  def storage_status(opts) do
    {keyspace, conn} = start_storage_connection(opts)

    stmt = "USE KEYSPACE #{keyspace}"

    case @xandra_mod.execute(conn, stmt) do
      {:error, %Xandra.Error{reason: :invalid}} ->
        :down

      {:error, _reason} = err ->
        err

      _ ->
        :up
    end
  end

  @impl Ecto.Adapter.Migration
  def supports_ddl_transaction?, do: false

  defp start_storage_connection(opts) do
    keyspace = Keyword.fetch!(opts, :keyspace)
    Application.ensure_all_started(:exandra)

    {:ok, conn} =
      @xandra_mod.start_link(Keyword.take(opts, [:nodes, :protocol_version, :connect_timeout]))

    {keyspace, conn}
  end

  @doc """
  Streams the results of a simple query or a prepared query.

  See `Xandra.prepare!/4` and `Xandra.stream_pages!/4` for more information including
  supported options.

  ## Examples

      stream =
        Exandra.stream!(
          "SELECT * FROM USERS WHERE can_contact = ?",
          [true],
          MyRepo,
        )

      Enum.each(
        stream,
        fn page ->
          Enum.each(page, fn user -> send_email(user.email, "Hello!") end)
        end
      )
  """
  @spec stream!(Ecto.Repo.t(), String.t(), list(term()), Keyword.t()) :: Xandra.PageStream.t()
  def stream!(repo, sql, values, opts \\ []) do
    %{pid: cluster_pid} = Ecto.Repo.Registry.lookup(repo.get_dynamic_repo())

    {prepare_opts, execute_opts} =
      Exandra.Connection.split_prepare_and_execute_options(opts)

    prepared = @xandra_cluster_mod.prepare!(cluster_pid, sql, prepare_opts)

    @xandra_cluster_mod.stream_pages!(
      cluster_pid,
      prepared,
      values,
      execute_opts
    )
  end
end

defimpl String.Chars, for: [Xandra.Simple, Xandra.Prepared, Xandra.Batch] do
  def to_string(simple) do
    inspect(simple)
  end
end