Skip to main content

lib/ecto/adapters/quack_db/append_insert.ex

if Code.ensure_loaded?(Ecto.Adapters.SQL) do
  defmodule Ecto.Adapters.QuackDB.AppendInsert do
    @moduledoc false

    import QuackDB.SQL.Fragment, only: [table: 1]

    def run(adapter_meta, schema_meta, header, rows, on_conflict, returning, placeholders, opts) do
      with :ok <- assert_supported!(schema_meta, rows, on_conflict, returning, placeholders, opts),
           conn <- ecto_connection(adapter_meta),
           append_header <- append_header(schema_meta, header),
           options <- append_options(schema_meta, append_header, opts),
           {:ok, %QuackDB.Result{} = result} <-
             insert_all(conn, schema_meta, append_header, rows, on_conflict, returning, options) do
        {result.num_rows, result.rows}
      else
        {:error, %QuackDB.Error{} = error} -> raise error
      end
    end

    defp insert_all(conn, schema_meta, header, rows, on_conflict, returning, options) do
      if insert_select?(schema_meta, header, on_conflict, returning) do
        insert_select(conn, schema_meta, header, rows, on_conflict, returning, options)
      else
        direct_insert_all(conn, schema_meta, header, rows, options)
      end
    end

    defp direct_insert_all(conn, schema_meta, header, rows, options) do
      case Keyword.get(options, :append_shape, :columns) do
        :columns ->
          columns = append_columns(header, rows, options)
          QuackDB.insert_columns(conn, schema_meta.source, columns, options)

        :rows ->
          rows = append_rows(header, rows, options)
          QuackDB.insert_rows(conn, schema_meta.source, rows, options)
      end
    end

    defp insert_select?(_schema_meta, _header, _on_conflict, returning) when returning != [],
      do: true

    defp insert_select?(_schema_meta, _header, {:nothing, _params, _targets}, _returning),
      do: true

    defp insert_select?(%{schema: nil}, _header, _on_conflict, _returning), do: false

    defp insert_select?(schema_meta, header, _on_conflict, _returning) do
      MapSet.new(header) != MapSet.new(schema_source_order(schema_meta))
    end

    defp insert_select(conn, schema_meta, header, rows, on_conflict, returning, options) do
      case DBConnection.status(conn, options) do
        :idle ->
          DBConnection.transaction(
            conn,
            fn tx ->
              case do_insert_select(
                     tx,
                     schema_meta,
                     header,
                     rows,
                     on_conflict,
                     returning,
                     options
                   ) do
                {:ok, result} -> result
                {:error, error} -> DBConnection.rollback(tx, error)
              end
            end,
            options
          )

        _status ->
          do_insert_select(conn, schema_meta, header, rows, on_conflict, returning, options)
      end
    end

    defp do_insert_select(conn, schema_meta, header, rows, on_conflict, returning, options) do
      temp_columns = append_columns!(schema_meta, header, options)
      temp_table = temp_table_name(temp_columns)
      create_statement = create_temp_table(temp_table, temp_columns)
      clear_statement = clear_temp_table(temp_table)

      insert_statement =
        insert_from_temp_statement(schema_meta, temp_columns, temp_table, on_conflict, returning)

      try do
        with {:ok, _result} <- QuackDB.query(conn, create_statement, [], options),
             {:ok, _result} <- QuackDB.query(conn, clear_statement, [], options),
             {:ok, _result} <- append_temp_data(conn, temp_table, header, rows, options),
             {:ok, %QuackDB.Result{} = result} <-
               QuackDB.query(conn, insert_statement, [], options) do
          {:ok, result}
        end
      after
        _ = QuackDB.query(conn, clear_statement, [], options)
      end
    end

    defp temp_table_name(columns) do
      hash = columns |> :erlang.phash2(4_294_967_296) |> Integer.to_string(36)
      "quackdb_append_#{hash}"
    end

    defp append_columns!(_schema_meta, header, options) do
      types = Keyword.fetch!(options, :columns)

      Enum.map(header, fn source ->
        %{
          source: source,
          type: column_type!(types, source)
        }
      end)
    end

    defp append_header(%{schema: nil}, header), do: header

    defp append_header(schema_meta, header) do
      schema_sources = schema_source_order(schema_meta)

      if MapSet.new(header) == MapSet.new(schema_sources), do: schema_sources, else: header
    end

    defp schema_source_order(%{schema: schema}) do
      Enum.map(schema.__schema__(:fields), &schema.__schema__(:field_source, &1))
    end

    defp create_temp_table(temp_table, columns) do
      ddl_columns = Enum.map(columns, fn column -> {column.source, column.type} end)
      QuackDB.DDL.create_table(temp_table, ddl_columns, temporary: true, if_not_exists: true)
    end

    defp clear_temp_table(temp_table) do
      ["DELETE FROM ", table(temp_table)]
    end

    defp column_type!(columns, column) do
      Enum.find_value(columns, fn
        {^column, type} ->
          {:ok, type}

        {name, type} when is_atom(name) and is_binary(column) ->
          if Atom.to_string(name) == column, do: {:ok, type}

        {name, type} when is_binary(name) and is_atom(column) ->
          if name == Atom.to_string(column), do: {:ok, type}

        _entry ->
          nil
      end)
      |> case do
        {:ok, type} -> type
        nil -> raise KeyError, key: column, term: columns
      end
    end

    defp append_temp_data(conn, temp_table, header, rows, options) do
      case Keyword.get(options, :append_shape, :columns) do
        :columns ->
          columns = append_columns(header, rows, options)
          QuackDB.insert_columns(conn, temp_table, columns, options)

        :rows ->
          rows = append_rows(header, rows, options)
          QuackDB.insert_rows(conn, temp_table, rows, options)
      end
    end

    defp insert_from_temp_statement(schema_meta, columns, temp_table, on_conflict, returning) do
      header = Enum.map(columns, & &1.source)

      QuackDB.DML.insert_into_select(
        {schema_meta.prefix, schema_meta.source},
        header,
        temp_table,
        header,
        on_conflict: dml_on_conflict(on_conflict),
        returning: returning
      )
    end

    defp dml_on_conflict({:raise, _params, []}), do: :raise
    defp dml_on_conflict({:nothing, _params, []}), do: :nothing
    defp dml_on_conflict({:nothing, _params, targets}), do: {:nothing, targets}

    defp assert_supported!(
           _schema_meta,
           {%Ecto.Query{}, _params},
           _on_conflict,
           _returning,
           _placeholders,
           _opts
         ) do
      unsupported!(
        :schema_inserts,
        "insert_method: :append does not support insert_all from queries"
      )
    end

    defp assert_supported!(
           _schema_meta,
           %Ecto.Query{},
           _on_conflict,
           _returning,
           _placeholders,
           _opts
         ) do
      unsupported!(
        :schema_inserts,
        "insert_method: :append does not support insert_all from queries"
      )
    end

    defp assert_supported!(schema_meta, _rows, {:raise, _params, []}, returning, [], opts)
         when is_list(returning) do
      assert_insert_select_columns!(schema_meta, returning != [], opts, "returning")
    end

    defp assert_supported!(schema_meta, _rows, {:nothing, _params, _targets}, returning, [], opts)
         when is_list(returning) do
      assert_insert_select_columns!(schema_meta, true, opts, "on_conflict: :nothing")
    end

    defp assert_supported!(_schema_meta, _rows, _on_conflict, _returning, placeholders, _opts)
         when placeholders != [] do
      unsupported!(
        :schema_inserts,
        "insert_method: :append does not support placeholders"
      )
    end

    defp assert_supported!(_schema_meta, _rows, _on_conflict, _returning, _placeholders, _opts) do
      unsupported!(
        :schema_inserts,
        "insert_method: :append only supports plain insert_all or on_conflict: :nothing"
      )
    end

    defp assert_insert_select_columns!(_schema_meta, false, _opts, _feature), do: :ok

    defp assert_insert_select_columns!(schema_meta, true, opts, feature) do
      if schema_meta.schema != nil or Keyword.has_key?(opts, :columns) do
        :ok
      else
        unsupported!(
          :schema_inserts,
          "insert_method: :append with #{feature} requires a schema or explicit append columns"
        )
      end
    end

    defp ecto_connection(%{pid: pool} = adapter_meta) do
      case Process.get({Ecto.Adapters.SQL, pool}) do
        :undefined -> ecto_pool(adapter_meta)
        nil -> ecto_pool(adapter_meta)
        conn -> conn
      end
    end

    defp ecto_pool(%{partition_supervisor: {name, _}}),
      do: {:via, PartitionSupervisor, {name, self()}}

    defp ecto_pool(%{pid: pool}), do: pool

    defp append_rows(header, rows, options) do
      column_types = Enum.map(header, &column_type(Keyword.get(options, :columns, []), &1))

      Enum.map(rows, fn row ->
        case ordered_row_entries(row, header, column_types, []) do
          {:ok, entries} -> entries
          :error -> fetched_row_entries(row, header, column_types)
        end
      end)
    end

    defp ordered_row_entries([], [], [], entries), do: {:ok, Enum.reverse(entries)}

    defp ordered_row_entries(
           [{field, value} | row],
           [field | header],
           [type | column_types],
           entries
         ) do
      entry = {field, normalize_append_value(value, type)}
      ordered_row_entries(row, header, column_types, [entry | entries])
    end

    defp ordered_row_entries(_row, _header, _column_types, _entries), do: :error

    defp fetched_row_entries(row, header, column_types) do
      header
      |> Enum.zip(column_types)
      |> Enum.map(fn {field, type} ->
        {field, row |> Keyword.fetch!(field) |> normalize_append_value(type)}
      end)
    end

    defp append_columns(header, rows, options) do
      column_types = Enum.map(header, &column_type(Keyword.get(options, :columns, []), &1))
      accumulators = Enum.map(header, fn _field -> [] end)

      rows
      |> Enum.reduce(accumulators, fn row, accumulators ->
        row
        |> append_row_values(header, column_types)
        |> prepend_column_values(accumulators)
      end)
      |> then(fn columns ->
        header
        |> Enum.zip(columns)
        |> Enum.map(fn {field, values} -> {field, Enum.reverse(values)} end)
      end)
    end

    defp append_row_values(row, header, column_types) do
      case ordered_row_values(row, header, column_types, []) do
        {:ok, values} -> values
        :error -> fetched_row_values(row, header, column_types)
      end
    end

    defp ordered_row_values([], [], [], values), do: {:ok, Enum.reverse(values)}

    defp ordered_row_values(
           [{field, value} | row],
           [field | header],
           [type | column_types],
           values
         ) do
      ordered_row_values(row, header, column_types, [normalize_append_value(value, type) | values])
    end

    defp ordered_row_values(_row, _header, _column_types, _values), do: :error

    defp fetched_row_values(row, header, column_types) do
      header
      |> Enum.zip(column_types)
      |> Enum.map(fn {field, type} ->
        row |> Keyword.fetch!(field) |> normalize_append_value(type)
      end)
    end

    defp prepend_column_values(values, accumulators) do
      values
      |> Enum.zip(accumulators)
      |> Enum.map(fn {value, accumulator} -> [value | accumulator] end)
    end

    defp normalize_append_value(nil, _type), do: nil

    defp normalize_append_value(value, :varchar) when is_map(value) and not is_struct(value) do
      JSON.encode!(value)
    end

    defp normalize_append_value(value, _type), do: value

    defp column_type(columns, column) do
      case column_type!(columns, column) do
        {:json, _type} -> :varchar
        type -> type
      end
    rescue
      KeyError -> nil
    end

    defp append_options(schema_meta, header, opts) do
      opts =
        opts
        |> base_options()
        |> maybe_put_schema(schema_meta)

      case Keyword.fetch(opts, :columns) do
        {:ok, _columns} -> opts
        :error -> maybe_put_schema_columns(opts, schema_meta, header)
      end
    end

    defp maybe_put_schema(opts, %{prefix: nil}), do: opts
    defp maybe_put_schema(opts, %{prefix: prefix}), do: Keyword.put(opts, :schema, prefix)

    defp maybe_put_schema_columns(opts, %{schema: nil}, _header), do: opts

    defp maybe_put_schema_columns(opts, %{schema: schema}, header) do
      source_types =
        Map.new(schema.__schema__(:fields), fn field ->
          {schema.__schema__(:field_source, field),
           QuackDB.Ecto.Type.column_type!(schema.__schema__(:type, field), :append)}
        end)

      columns = Enum.map(header, fn source -> {source, Map.fetch!(source_types, source)} end)
      Keyword.put(opts, :columns, columns)
    end

    defp base_options(opts) do
      opts
      |> Keyword.take([:timeout, :columns, :append_shape])
      |> validate_append_shape!()
      |> maybe_put_batch_size(opts)
    end

    defp validate_append_shape!(options) do
      case Keyword.get(options, :append_shape, :columns) do
        shape when shape in [:columns, :rows] ->
          options

        other ->
          unsupported!(
            :schema_inserts,
            "unsupported append_shape for QuackDB append insert: #{inspect(other)}"
          )
      end
    end

    defp maybe_put_batch_size(options, opts) do
      case Keyword.fetch(opts, :chunk_every) do
        {:ok, chunk_every} -> Keyword.put(options, :batch_size, chunk_every)
        :error -> options
      end
    end

    defp unsupported!(feature, message) do
      raise QuackDB.Error.new(:ecto_feature_not_supported, message,
              source: :client,
              metadata: %{feature: feature}
            )
    end
  end
end