lib/dagex/migrations.ex

# The code in this file was initially taken from the Oban codebase
# (https://github.com/sorentwo/oban/blob/v2.10.1/lib/oban/migrations.ex) and is
# subject to the following copyright and licensing terms, which may differ from
# the remainder of this project:
# Copyright 2019 Parker Selbert
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule Dagex.Migrations do
  @moduledoc """
  Migrations create and modify the database tables Dagex needs to function.

  ## Usage

  To use migrations in your application you'll need to generate an `Ecto.Migration` that wraps
  calls to `Dagex.Migrations`:

  ```bash
  mix ecto.gen.migration add_dagex
  ```

  Open the generated migration in your editor and call the `up` and `down` functions on
  `Dagex.Migrations`:

  ```elixir
  defmodule MyApp.Repo.Migrations.AddDagex do
    use Ecto.Migration

    def up, do: Dagex.Migrations.up()

    def down, do: Dagex.Migrations.down()
  end
  ```

  This will run all of Dagex's versioned migrations for your database. Migrations between versions
  are idempotent. As new versions are released you may need to run additional migrations.

  Now, run the migration to create the table:

  ```bash
  mix ecto.migrate
  ```
  """

  use Ecto.Migration

  @initial_version 1
  @current_version 3

  @spec up(opts :: keyword()) :: :ok
  @doc """
  Run the `up` changes for all migrations between the initial version and the current version.

  ## Example

  Run all migrations up to the current version:

      Dagex.Migrations.up()

  Run migrations up to a specified version:

      Dagex.Migrations.up(version: 2)
  """
  def up(opts \\ []) when is_list(opts) do
    version = Keyword.get(opts, :version, @current_version)
    initial = migrated_version(repo())

    cond do
      initial == 0 ->
        change(@initial_version..version, :up)

      initial < version ->
        change((initial + 1)..version, :up)

      true ->
        :ok
    end

    Ecto.Migration.flush()
    :ok
  end

  @spec down(opts :: keyword()) :: :ok
  @doc """
  Run the `down` changes for all migrations between the current version and the initial version.

  ## Example

  Run all migrations from current version down to the first:

      Dagex.Migrations.down()

  Run migrations down to a specified version:

      Dagex.Migrations.down(version: 5)
  """
  def down(opts \\ []) when is_list(opts) do
    version = Keyword.get(opts, :version, @initial_version)
    initial = max(migrated_version(repo()), @initial_version)

    if initial >= version do
      change(initial..version, :down)
    end

    Ecto.Migration.flush()
    :ok
  end

  @spec initial_version() :: integer()
  @doc false
  def initial_version, do: @initial_version

  @spec current_version() :: integer()
  @doc false
  def current_version, do: @current_version

  @spec migrated_version(repo :: Ecto.Repo.t()) :: integer()
  @doc false
  def migrated_version(repo) do
    query = """
    SELECT description
    FROM pg_class
    LEFT JOIN pg_description ON pg_description.objoid = pg_class.oid
    LEFT JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
    WHERE pg_class.relname = 'dagex_nodes'
    """

    case repo.query(query) do
      {:ok, %{rows: [[version]]}} when is_binary(version) -> String.to_integer(version)
      _result -> 0
    end
  end

  @spec setup_node_type(String.t(), String.t()) :: Macro.t()
  defmacro setup_node_type(table_name, "1.0.0") do
    quote bind_quoted: [table_name: table_name] do
      execute(
        """
        CREATE OR REPLACE FUNCTION dagex_maintain_nodes_#{table_name}()
        RETURNS TRIGGER AS
        $$
        BEGIN
        IF (TG_OP = 'INSERT') THEN
          INSERT INTO dagex_nodes (node_type, ext_id) VALUES('#{table_name}', NEW.id::text);
        ELSIF (TG_OP = 'DELETE') THEN
          DELETE FROM dagex_nodes WHERE ext_id = OLD.id::text;
        END IF;
        RETURN NULL;
        END;
        $$
        LANGUAGE plpgsql;
        """,
        """
        DROP FUNCTION dagex_maintain_nodes_#{table_name}();
        """
      )

      execute(
        """
        CREATE OR REPLACE TRIGGER dagex_maintain_nodes_trigger_#{table_name}
        AFTER INSERT OR DELETE ON #{table_name}
        FOR EACH ROW EXECUTE FUNCTION dagex_maintain_nodes_#{table_name}();
        """,
        """
        DROP TRIGGER dagex_maintain_nodes_trigger_#{table_name} ON #{table_name};
        """
      )
    end
  end

  defmacro setup_node_type(table_name, "2.0.0") do
    quote bind_quoted: [table_name: table_name] do
      execute(
        """
        CREATE OR REPLACE FUNCTION dagex_maintain_nodes_#{table_name}()
        RETURNS TRIGGER AS
        $$
        BEGIN
        IF (TG_OP = 'INSERT') THEN
          IF (NEW.id::text = '*') THEN
            raise exception 'Cannot create node with id *; that id is reserved for the collection supremum' USING ERRCODE = 'check_violation', CONSTRAINT = 'dagex_reserved_supremum_id';
          END IF;
          INSERT INTO dagex_nodes (node_type, ext_id) VALUES('#{table_name}', '*') ON CONFLICT DO NOTHING;
          INSERT INTO dagex_nodes (node_type, ext_id) VALUES('#{table_name}', NEW.id::text);
        ELSIF (TG_OP = 'UPDATE') THEN
          IF (NEW.id != OLD.id) THEN
            UPDATE dagex_nodes SET ext_id = NEW.id::text WHERE ext_id = OLD.id::text;
          END IF;
        ELSIF (TG_OP = 'DELETE') THEN
          DELETE FROM dagex_nodes WHERE ext_id = OLD.id::text;
        END IF;
        RETURN NULL;
        END;
        $$
        LANGUAGE plpgsql;
        """,
        """
        CREATE OR REPLACE FUNCTION dagex_maintain_nodes_#{table_name}()
        RETURNS TRIGGER AS
        $$
        BEGIN
        IF (TG_OP = 'INSERT') THEN
          INSERT INTO dagex_nodes (node_type, ext_id) VALUES('#{table_name}', NEW.id::text);
        ELSIF (TG_OP = 'DELETE') THEN
          DELETE FROM dagex_nodes WHERE ext_id = OLD.id::text;
        END IF;
        RETURN NULL;
        END;
        $$
        LANGUAGE plpgsql;
        """
      )

      execute(
        """
        CREATE OR REPLACE TRIGGER dagex_maintain_nodes_trigger_#{table_name}
        AFTER INSERT OR UPDATE OR DELETE ON #{table_name}
        FOR EACH ROW EXECUTE FUNCTION dagex_maintain_nodes_#{table_name}();
        """,
        """
        CREATE OR REPLACE TRIGGER dagex_maintain_nodes_trigger_#{table_name}
        AFTER INSERT OR DELETE ON #{table_name}
        FOR EACH ROW EXECUTE FUNCTION dagex_maintain_nodes_#{table_name}();
        """
      )
    end
  end

  defmacro setup_node_type(table_name, "3.0.0") do
    quote bind_quoted: [table_name: table_name] do
      execute(
        """
        CREATE OR REPLACE FUNCTION dagex_maintain_nodes_#{table_name}()
        RETURNS TRIGGER AS
        $$
        BEGIN
        IF (TG_OP = 'INSERT') THEN
          INSERT INTO dagex_nodes (node_type, ext_id) VALUES('#{table_name}', NEW.id::text);
        ELSIF (TG_OP = 'UPDATE') THEN
          IF (NEW.id != OLD.id) THEN
            UPDATE dagex_nodes SET ext_id = NEW.id::text WHERE ext_id = OLD.id::text;
          END IF;
        ELSIF (TG_OP = 'DELETE') THEN
          DELETE FROM dagex_nodes WHERE ext_id = OLD.id::text;
        END IF;
        RETURN NULL;
        END;
        $$
        LANGUAGE plpgsql;
        """,
        """
        CREATE OR REPLACE FUNCTION dagex_maintain_nodes_#{table_name}()
        RETURNS TRIGGER AS
        $$
        BEGIN
        IF (TG_OP = 'INSERT') THEN
          IF (NEW.id::text = '*') THEN
            raise exception 'Cannot create node with id *; that id is reserved for the collection supremum' USING ERRCODE = 'check_violation', CONSTRAINT = 'dagex_reserved_supremum_id';
          END IF;
          INSERT INTO dagex_nodes (node_type, ext_id) VALUES('#{table_name}', '*') ON CONFLICT DO NOTHING;
          INSERT INTO dagex_nodes (node_type, ext_id) VALUES('#{table_name}', NEW.id::text);
        ELSIF (TG_OP = 'UPDATE') THEN
          IF (NEW.id != OLD.id) THEN
            UPDATE dagex_nodes SET ext_id = NEW.id::text WHERE ext_id = OLD.id::text;
          END IF;
        ELSIF (TG_OP = 'DELETE') THEN
          DELETE FROM dagex_nodes WHERE ext_id = OLD.id::text;
        END IF;
        RETURN NULL;
        END;
        $$
        LANGUAGE plpgsql;
        """
      )

      execute(
        """
        CREATE OR REPLACE TRIGGER dagex_maintain_nodes_trigger_#{table_name}
        AFTER INSERT OR UPDATE OR DELETE ON #{table_name}
        FOR EACH ROW EXECUTE FUNCTION dagex_maintain_nodes_#{table_name}();
        """,
        """
        CREATE OR REPLACE TRIGGER dagex_maintain_nodes_trigger_#{table_name}
        AFTER INSERT OR UPDATE OR DELETE ON #{table_name}
        FOR EACH ROW EXECUTE FUNCTION dagex_maintain_nodes_#{table_name}();
        """
      )
    end
  end

  defp change(range, direction, opts \\ []) do
    for index <- range do
      pad_idx = String.pad_leading(to_string(index), 2, "0")

      [__MODULE__, "V#{pad_idx}"]
      |> Module.concat()
      |> apply(direction, [opts])
    end

    case direction do
      :up -> record_version(opts, Enum.max(range))
      :down -> record_version(opts, Enum.min(range) - 1)
    end
  end

  defp record_version(_opts, 0), do: :ok

  defp record_version(_opts, version) do
    execute("COMMENT ON TABLE dagex_nodes IS '#{version}'")
  end
end