Skip to main content

lib/mix/tasks/continuum.gen.migration.ex

defmodule Mix.Tasks.Continuum.Gen.Migration do
  @moduledoc """
  Generates the Ecto migration that creates Continuum's Postgres tables.

      mix continuum.gen.migration

  Writes a single migration file under `priv/repo/migrations/` (or whatever
  is configured for your repo) that creates: `continuum_runs`,
  monthly-partitioned `continuum_events`, `continuum_signals`,
  `continuum_timers`, `continuum_activity_tasks`,
  `continuum_activity_results`, `continuum_snapshots`,
  `continuum_workflow_versions`, and the `continuum_lease_token_seq` sequence.
  """
  use Mix.Task

  import Macro, only: [camelize: 1]

  @shortdoc "Generates a migration for Continuum's Postgres schema"

  @impl true
  def run(args) do
    {opts, _, _} = OptionParser.parse(args, switches: [repo: :string])

    repo = parse_repo(opts)
    path = source_migrations_path(repo)
    File.mkdir_p!(path)

    timestamp = timestamp()
    name = "create_continuum_tables"
    filename = Path.join(path, "#{timestamp}_#{name}.exs")
    module_name = "#{inspect(repo)}.Migrations.#{camelize(name)}"

    if File.exists?(filename) do
      Mix.raise("migration #{filename} already exists")
    end

    File.write!(filename, migration_source(module_name))
    Mix.shell().info("Created #{filename}")
  end

  defp source_migrations_path(repo) do
    priv = Keyword.get(repo.config(), :priv, "priv/repo")
    Path.join([File.cwd!(), priv, "migrations"])
  end

  defp parse_repo(opts) do
    case opts[:repo] do
      nil ->
        Application.get_env(:continuum, :repo) ||
          Mix.raise(
            "no repo configured. Pass --repo MyApp.Repo or set " <>
              ":continuum, :repo in config"
          )

      repo_str ->
        Module.concat([repo_str])
    end
  end

  defp timestamp do
    {{y, m, d}, {hh, mm, ss}} = :calendar.universal_time()
    "#{y}#{pad(m)}#{pad(d)}#{pad(hh)}#{pad(mm)}#{pad(ss)}"
  end

  defp pad(n) when n < 10, do: "0#{n}"
  defp pad(n), do: "#{n}"

  defp migration_source(module_name) do
    """
    defmodule #{module_name} do
      use Ecto.Migration

      def up do
        execute "CREATE SEQUENCE IF NOT EXISTS continuum_lease_token_seq"

        create table(:continuum_runs, primary_key: false, options: "WITH (fillfactor = 70)") do
          add :id, :uuid, primary_key: true
          add :workflow, :text, null: false
          add :version_hash, :bytea, null: false
          add :namespace, :text, null: false, default: "default"
          add :state, :text, null: false
          add :input, :bytea, null: false
          add :attributes, :map, null: false, default: %{}
          add :result, :bytea
          add :error, :bytea
          add :trace_context, :bytea
          add :parent_run_id, :uuid
          add :parent_command_id, :bytea
          add :correlation_id, :uuid
          add :continued_from_run_id, :uuid
          add :started_at, :utc_datetime_usec, null: false, default: fragment("now()")
          add :completed_at, :utc_datetime_usec
          add :lease_owner, :text
          add :lease_token, :bigint
          add :lease_expires_at, :utc_datetime_usec
          add :next_wakeup_at, :utc_datetime_usec
          add :retention_until, :utc_datetime_usec
          add :cancel_requested_at, :utc_datetime_usec
        end

        execute \"\"\"
        CREATE INDEX continuum_runs_dispatch_idx
          ON continuum_runs (next_wakeup_at NULLS LAST)
          WHERE state = 'suspended' AND lease_owner IS NULL
        \"\"\"

        execute \"\"\"
        CREATE INDEX continuum_runs_lease_idx
          ON continuum_runs (lease_expires_at)
          WHERE lease_owner IS NOT NULL
        \"\"\"

        execute \"\"\"
        CREATE INDEX continuum_runs_parent_idx
          ON continuum_runs (parent_run_id)
          WHERE parent_run_id IS NOT NULL
        \"\"\"

        execute \"\"\"
        CREATE INDEX continuum_runs_correlation_idx
          ON continuum_runs (correlation_id)
          WHERE correlation_id IS NOT NULL
        \"\"\"

        execute \"\"\"
        CREATE INDEX continuum_runs_continued_from_idx
          ON continuum_runs (continued_from_run_id)
          WHERE continued_from_run_id IS NOT NULL
        \"\"\"

        execute \"\"\"
        CREATE INDEX continuum_runs_correlation_completed_idx
          ON continuum_runs (correlation_id, completed_at)
          WHERE correlation_id IS NOT NULL
        \"\"\"

        execute \"\"\"
        CREATE INDEX continuum_runs_attributes_gin_idx
          ON continuum_runs USING gin (attributes)
        \"\"\"

        execute \"\"\"
        CREATE INDEX continuum_runs_namespace_state_idx
          ON continuum_runs (namespace, state)
        \"\"\"

        execute \"\"\"
        CREATE TABLE continuum_events (
          run_id uuid NOT NULL,
          seq bigint NOT NULL,
          event_type text NOT NULL,
          payload bytea NOT NULL,
          inserted_at timestamptz NOT NULL DEFAULT now(),
          PRIMARY KEY (run_id, seq, inserted_at)
        ) PARTITION BY RANGE (inserted_at)
        \"\"\"

        create_initial_event_partitions()

        create table(:continuum_signals) do
          add :run_id, :uuid, null: false
          add :name, :text, null: false
          add :payload, :bytea, null: false
          add :delivered, :boolean, null: false, default: false
          add :inserted_at, :utc_datetime_usec, null: false, default: fragment("now()")
        end

        execute \"\"\"
        CREATE INDEX continuum_signals_pending_idx
          ON continuum_signals (run_id, name)
          WHERE delivered = false
        \"\"\"

        create table(:continuum_timers, primary_key: false) do
          add :id, :uuid, primary_key: true
          add :run_id, :uuid, null: false
          add :fires_at, :utc_datetime_usec, null: false
          add :fired, :boolean, null: false, default: false
        end

        execute \"\"\"
        CREATE INDEX continuum_timers_due_idx
          ON continuum_timers (fires_at)
          WHERE fired = false
        \"\"\"

        create table(:continuum_activity_tasks, primary_key: false) do
          add :id, :uuid, primary_key: true
          add :run_id, :uuid, null: false
          add :seq, :bigint, null: false
          add :mfa, :bytea, null: false
          add :attempt, :integer, null: false, default: 1
          add :state, :text, null: false
          add :scheduled_at, :utc_datetime_usec, null: false, default: fragment("now()")
          add :available_at, :utc_datetime_usec, null: false, default: fragment("now()")
          add :lease_owner, :text
          add :lease_expires_at, :utc_datetime_usec
          add :result, :bytea
          add :error, :bytea
        end

        execute \"\"\"
        CREATE INDEX continuum_activity_tasks_pickup_idx
          ON continuum_activity_tasks (available_at)
          WHERE state = 'available'
        \"\"\"

        create table(:continuum_activity_results, primary_key: false) do
          add :activity_module, :text, null: false
          add :idempotency_key, :text, null: false
          add :run_id, :uuid, null: false
          add :seq, :bigint, null: false
          add :result, :bytea, null: false
          add :completed_at, :utc_datetime_usec, null: false, default: fragment("now()")
        end

        execute \"\"\"
        ALTER TABLE continuum_activity_results
          ADD PRIMARY KEY (activity_module, idempotency_key)
        \"\"\"

        create table(:continuum_snapshots) do
          add :run_id, :uuid, null: false
          add :through_seq, :bigint, null: false
          add :version_hash, :bytea, null: false
          add :format_version, :smallint, null: false, default: 1
          add :payload, :bytea, null: false
          add :taken_at, :utc_datetime_usec, null: false, default: fragment("now()")
        end

        create unique_index(:continuum_snapshots, [:run_id, :through_seq],
                 name: :continuum_snapshots_run_seq_idx
               )

        execute \"\"\"
        CREATE INDEX continuum_snapshots_latest_idx
          ON continuum_snapshots (run_id, through_seq DESC)
        \"\"\"

        create table(:continuum_workflow_versions, primary_key: false) do
          add :workflow, :text, null: false
          add :version_hash, :bytea, null: false
          add :entrypoint, :text, null: false
          add :registered_at, :utc_datetime_usec, null: false, default: fragment("now()")
        end

        execute \"\"\"
        ALTER TABLE continuum_workflow_versions
          ADD PRIMARY KEY (workflow, version_hash)
        \"\"\"
      end

      def down do
        drop_if_exists table(:continuum_workflow_versions)
        drop_if_exists table(:continuum_snapshots)
        drop_if_exists table(:continuum_activity_results)
        drop_if_exists table(:continuum_activity_tasks)
        drop_if_exists table(:continuum_timers)
        drop_if_exists table(:continuum_signals)
        drop_if_exists table(:continuum_events)
        drop_if_exists table(:continuum_runs)
        execute "DROP SEQUENCE IF EXISTS continuum_lease_token_seq"
      end

      defp create_initial_event_partitions do
        today = Date.utc_today()
        month = Date.new!(today.year, today.month, 1)

        for offset <- 0..3 do
          create_event_partition(Date.add(month, offset * 32) |> Date.beginning_of_month())
        end
      end

      defp create_event_partition(month) do
        next_month = month |> Date.add(32) |> Date.beginning_of_month()

        execute \"\"\"
        CREATE TABLE IF NOT EXISTS \#{event_partition_name(month)}
        PARTITION OF continuum_events
        FOR VALUES FROM ('\#{Date.to_iso8601(month)} 00:00:00+00')
        TO ('\#{Date.to_iso8601(next_month)} 00:00:00+00')
        \"\"\"
      end

      defp event_partition_name(%Date{year: year, month: month}) do
        "continuum_events_y\#{year}_m\#{pad2(month)}"
      end

      defp pad2(month) when month < 10, do: "0\#{month}"
      defp pad2(month), do: "\#{month}"
    end
    """
  end
end