
defmodule Exandra do
  @moduledoc """
  Adapter module for [Apache Cassandra](
  and [ScyllaDB](

  Uses [`Xandra`]( for communication with the
  underlying database.

  Uses [`Ecto`]( for database interfacing,
  running schema migrations, and querying operations.

  ## Configuration

  To configure an `Ecto.Repo` that uses `Exandra` as its adapter, you can use
  the application configuration or pass the options when starting the repo.

  You can use the following options:

    * Any of the options supported by `Ecto.Repo` itself, which you can see
      in the `Ecto.Repo` documentation.

    * Any of the options supported by `Xandra.Cluster.start_link/1`.


  To configure your Ecto repository to use this adapter, you can use the
  `:adapter` option. For example, when defining the repo:

      defmodule MyApp.Repo do
        use Ecto.Repo, otp_app: :my_app, adapter: Exandra

  You can configure your database connection in `config/dev.exs`. Here's an example `dev configuration`:

      # Configure your database
      config :my_app, MyApp.Repo,
        migration_primary_key: [name: :id, type: :uuid], # Overrides the default type `bigserial` used for version attribute in schema migration
        nodes: [""],  # List of database connection endpoints
        keyspace: "my_app_dev", # Name of your keyspace
        sync_connect: 5000,             # Waiting time in milliseconds for the database connection
        log: :info,
        stacktrace: true,
        show_sensitive_data_on_connection_error: true,
        pool_size: 10

  **Note:** The `bigserial` data type is specific to PostgreSQL databases and is not present in Scylla/Cassandra.

  ## Schemas

  You can regularly use `Ecto.Schema` with Exandra. For example:

      defmodule User do
        use Ecto.Schema

        @primary_key {:id, Ecto.UUID, autogenerate: true}
        schema "users" do
          field :email, :string
          field :meta, Exandra.Map, key: :string, value: :string

  You can use all the usual types (`:string`, `Ecto.UUID`, and so on).

  ### Maps

  The `:map` type gets stored in Cassandra/Scylla as a blob of text with the map encoded as
  JSON. For example, if you have a schema with

      field :features, :map

  you can pass the field as an Elixir map when setting it, and Exandra will convert it to a map
  on the way from the database. Because Exandra uses JSON for this, you'll have to pay attention
  to things such as atom keys (which can be used when writing, but will be strings when reading)
  and such.

  ### User-Defined Types (UDTs)

  If one of your fields is a UDT, you can use the `Exandra.UDT` type for it. For example, if you
  have a `phone_number` UDT, you can declare fields with that type as:

      field :home_phone, Exandra.UDT, type: :phone_number
      field :office_phone, Exandra.UDT, type: :phone_number

  > #### String Keys {: .warning}
  > There is no validation with `Exandra.UDT` and the keys _must_ be strings.

  Alternatively, you can use the `Exandra.EmbeddedType` for `Ecto.Schema`-backed UDTs. For example, if you have a
  `phone_number` UDT, you can use:

      field :home_phone, Exandra.EmbeddedType, using: MyApp.PhoneSchema

  Finally, if you have a column of frozen UDTs `list<frozen<phone_number>>`, you can still use the
  `Exandra.EmbeddedType` just as before with `cardinality: :many` like so:

      field :home_phone, Exandra.EmbeddedType, cardinality: :many, using: MyApp.PhoneSchema

  ### Inets

  Cassandra/Scylla has a native `inet` type which represents either an ipv4 or an ipv6 address.
  Exandra provides the `Exandra.Inet` type for these fields.

  field :last_ip, Exandra.Inet

  ### Tuples

  Tuples can be declared using the `Exandra.Tuple` type.

  field :version, Exandra.Tuple, types: [:integer, :integer, :integer]

  ### Native Collections (Lists, Maps, Sets)

  Access to native Cassandra/Scylla collections is available
  using the appropriate data types in the field definition.

  @primary_key false
  schema "hotels" do
   field :checkins, {:array, :utc_datetime}                            # list<timestamp>
   field :room_to_customer, Exandra.Map, key: :integer, value: :string # map<int, string>
   field :available_rooms, Exandra.Set, type: :integer                 # set<int>

  > #### Composite Collections {: .tip}
  > It's possible to create composite collections using the following syntax:
  > ```elixir
  > field :complex_type, {:array, Exandra.Map}, key: :string, value: Exandra.Set, type: :integer
  > ```
  > This creates a `list<map<tuple<integer>, set<string>>>`.
  > However, please note that due to the limited expressivity of this representation,
  > each collection at the same level will have the same typing information:
  > ```elixir
  > field :valid, Exandra.Set, type: Exandra.Set, type: :integer                                     # set<set<int>>
  > field :invalid, Exandra.Map, key: Exandra.Set, type: :integer, value: Exandra.Set, type: :string # map<set<int>, set<int>> not map<set<int>, set<string>>
  > ```

  ### Counter Tables

  You can use the `Exandra.Counter` type to create counter fields (in counter tables). For

      @primary_key false
      schema "page_views" do
        field :route, :string, primary_key: true
        field :total, Exandra.Counter

  You can only *update* counter fields. You'll have to use `c:Ecto.Repo.update_all/2`
  to insert or update counters. For example, in the table above, you'd update the
  `:total` counter field with:

      query =
        from page_view in "page_views",
          where: page_view.route == "/browse",
          update: [set: [total: 1]]


  ## Batch Queries

  You can run **batch queries** through Exandra. Batch queries are supported by
  Cassandra/Scylla, and allow you to run multiple queries in a single request.
  See `Exandra.Batch` for more information and examples.

  ## Multiple keyspaces using prefixes

  You can use [query prefixes]( to
  query different keyspaces using the same schemas. As pointed out in the Ecto docs,
  migrations must be run for *each* prefix in this case.

  ## Migrations

  You can use Exandra to run migrations as well, as it supports most of the DDL-related
  commands from `Ecto.Migration`. For example:

      defmodule AddUsers do
        use Ecto.Migration

        def change do
          create table("users", primary_key: false) do
            add :email, :string, primary_key: true
            add :age, :int

  > #### Cassandra and Scylla Types {: .info}
  > When writing migrations, remember that you must use the **actual types** from Cassandra or
  > Scylla, which you must pass in as an *atom*.
  > For example, to add a column with the type of
  > a map of integer keys to boolean values, you need to declare its type as
  > `:"map<int, boolean>"`.

  This is a non-comprehensive list of types you can use:

    * `:"map<key_type, value_type>"` - maps (such as `:"map<int, boolean>"`).
    * `:"list<type>"` - lists (such as `:"list<uuid>"`).
    * `:string` - gets translated to the `text` type.
    * `:map` - maps get stored as text, and Exandra dumps and loads them automatically.
    * `<udt>` - User-Defined Types (UDTs) should be specified as their name, expressed as an
      atom. For example, a UDT called `full_name` would be specified as the type `:full_name`.
    * `:naive_datetime`, `:naive_datetime_usec`, `:utc_datetime`, `:utc_datetime_usec` -
      these are all represented as the `timestamp` type.

  ### User-Defined Types (UDTs)

  `Ecto.Migration` doesn't support creating, altering, or dropping Cassandra/Scylla **UDTs**.
  To do those operations in a migration, use `Ecto.Migration.execute/1`
  or `Ecto.Migration.execute/2`. For example, in your migration module:

      def change do
          _up_query = "CREATE TYPE full_name (first_name text, last_name text))",
          _down_query = "DROP TYPE full_name"


  # This overrides the checkout/3 function defined in Ecto.Adapters.SQL. That function
  # is a private API, and is defined by "use Ecto.Adapters.SQL". This is why this function
  # needs to appear BEFORE the call to "use Ecto.Adapters.SQL", so that we essentially
  # override the call. We'll work with the Ecto team to make that callback overridable.
  @doc false
  @spec checkout(map(), keyword(), (pid() -> result)) :: result when result: var
  def checkout(%{sql: conn_mod, pid: cluster} = meta, opts, fun) when is_function(fun, 1) do
    conn_mod.checkout(cluster, fun, Keyword.merge(opts, meta.opts))

  use Ecto.Adapters.SQL, driver: :exandra

  @behaviour Ecto.Adapter.Storage

  @xandra_mod Application.compile_env(:exandra, :xandra_module, Xandra)

  @xandra_cluster_mod Application.compile_env(

  @doc false
  defmacro embedded_type(field, embedded_schema, opts \\ []) do
    quote do
      opts = Keyword.merge([using: unquote(embedded_schema)], unquote(opts))
      field unquote(field), Exandra.EmbeddedType, opts

  @doc """
  Executes a **batch query**.

  See `Exandra.Batch`.

  ## Examples

      queries = [
        {"INSERT INTO users (email) VALUES (?)", [""]},
        {"INSERT INTO users (email) VALUES (?)", [""]}

      Exandra.execute_batch(MyApp.Repo, %Exandra.Batch{queries: queries})

  @spec execute_batch(Ecto.Repo.t(), Exandra.Batch.t(), keyword()) ::
          :ok | {:error, Exception.t()}
  def execute_batch(repo, %Exandra.Batch{queries: queries} = _batch, options \\ [])
      when is_atom(repo) and is_list(options) do
    {prepare_options, execute_options} =

    fun = fn conn ->
      try do
        # First, prepare all queries (doesn't matter the order).
        prepared_queries =
          |> Enum.uniq_by(fn {sql, _values} -> sql end)
          |> Enum.reduce(%{}, fn {sql, _values}, acc ->
            case @xandra_mod.prepare(conn, sql, prepare_options) do
              {:ok, %Xandra.Prepared{} = prepared} -> Map.put(acc, sql, prepared)
              {:error, reason} -> throw({:prepare_error, reason})

        # Now, add them all to a Xandra.Batch and execute the batch.
        batch =
          Enum.reduce(queries,, fn {sql, values}, batch ->
            Xandra.Batch.add(batch, Map.fetch!(prepared_queries, sql), values)

        case @xandra_mod.execute(conn, batch, execute_options) do
          {:ok, %Xandra.Void{}} -> :ok
          {:error, reason} -> {:errror, reason}
        {:prepare_error, reason} -> {:error, reason}

    repo.checkout(fun, options)

  @doc false
  def autogenerate(Ecto.UUID), do: Ecto.UUID.generate()
  def autogenerate(:binary_id), do: Ecto.UUID.bingenerate()
  def autogenerate(type), do: super(type)

  @doc false
  @impl Ecto.Adapter
  def dumpers(:binary_id, type), do: [type, Ecto.UUID]
  def dumpers(:map, _type), do: [&encode_json/1]
  def dumpers(:naive_datetime, _type), do: [&naive_datetime_to_datetime/1]
  def dumpers(_val, type), do: [type]

  @doc false
  @impl Ecto.Adapter
  def loaders(:binary_id, _type), do: []
  def loaders(:exandra_map, type), do: [&Ecto.Type.embedded_load(type, &1, :exandra_map), type]
  def loaders(:exandra_set, type), do: [&Ecto.Type.embedded_load(type, &1, :exandra_set), type]

  def loaders({:map, _}, type),
    do: [&decode_json/1, &Ecto.Type.embedded_load(type, &1, :map), type]

  def loaders(:map, type), do: [&decode_json/1, type]
  # Xandra returns UUIDs as strings, so we don't need to do any loading.
  def loaders(:uuid, _type), do: []
  def loaders(:decimal, type), do: [&decimal_decode/1, type]
  def loaders(_, type), do: [type]

  defp decimal_decode({coefficient, exponent}) do
    sign = if coefficient < 0, do: -1, else: 1
    {:ok,, abs(coefficient), -exponent)}

  defp decimal_decode(nil), do: {:ok, nil}

  defp decode_json(data) when is_binary(data), do: {:ok, json_library().decode!(data)}
  defp decode_json(data), do: {:ok, data}

  defp encode_json(data) do
    {:ok, json_library().encode!(data)}

  defp json_library(), do: Application.get_env(:exandra, :json_library, Jason)

  defp naive_datetime_to_datetime(%NaiveDateTime{} = datetime) do
    case DateTime.from_naive(datetime, "Etc/UTC") do
      {:ok, datetime} -> {:ok, datetime}
      {:ambiguous, _first, _second} -> {:error, :ambiguous}
      {:gap, _, _} -> {:error, :gap}
      {:error, reason} -> {:error, reason}

  @impl Ecto.Adapter.Migration
  def lock_for_migrations(_, _, fun), do: fun.()

  @impl Ecto.Adapter.Storage
  def storage_up(opts) do
    {keyspace, conn} = start_storage_connection(opts)

    stmt = """
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}
    AND durable_writes = true;

    storage_toggle(conn, stmt, "CREATED", :already_up)

  @impl Ecto.Adapter.Storage
  def storage_down(opts) do
    {keyspace, conn} = start_storage_connection(opts)

    storage_toggle(conn, "DROP KEYSPACE IF EXISTS #{keyspace}", "DROPPED", :already_down)

  defp storage_toggle(conn, stmt, effect, error_msg) do
    case @xandra_mod.execute(conn, stmt) do
      {:ok, %Xandra.SchemaChange{effect: ^effect}} ->

      {:ok, %Xandra.Void{}} ->
        {:error, error_msg}

      err ->

  @impl Ecto.Adapter.Storage
  def storage_status(opts) do
    {keyspace, conn} = start_storage_connection(opts)

    stmt = "USE KEYSPACE #{keyspace}"

    case @xandra_mod.execute(conn, stmt) do
      {:error, %Xandra.Error{reason: :invalid}} ->

      {:error, _reason} = err ->

      _ ->

  @impl Ecto.Adapter.Migration
  def supports_ddl_transaction?, do: false

  defp start_storage_connection(opts) do
    keyspace = Keyword.fetch!(opts, :keyspace)

    {:ok, conn} =
      @xandra_mod.start_link(Keyword.take(opts, [:nodes, :protocol_version, :connect_timeout]))

    {keyspace, conn}

  @doc """
  Streams the results of a simple query or a prepared query.

  See `Xandra.prepare!/4` and `Xandra.stream_pages!/4` for more information including
  supported options.

  ## Examples

      stream =!(
          "SELECT * FROM USERS WHERE can_contact = ?",

        fn page ->
          Enum.each(page, fn user -> send_email(, "Hello!") end)
  @spec stream!(Ecto.Repo.t(), String.t(), list(term()), Keyword.t()) :: Xandra.PageStream.t()
  def stream!(repo, sql, values, opts \\ []) do
    %{pid: cluster_pid} = Ecto.Repo.Registry.lookup(repo.get_dynamic_repo())

    {prepare_opts, execute_opts} =

    prepared = @xandra_cluster_mod.prepare!(cluster_pid, sql, prepare_opts)


defimpl String.Chars, for: [Xandra.Simple, Xandra.Prepared, Xandra.Batch] do
  def to_string(simple) do