defmodule Mix.Tasks.Pgflow.Gen.FlowMigration do
@shortdoc "Generates an Ecto migration to compile a PgFlow flow"
@moduledoc """
Generates an Ecto migration that registers a PgFlow flow in the database.
## Usage
mix pgflow.gen.flow_migration MyApp.Flows.ArticleFlow
mix pgflow.gen.flow_migration MyApp.Flows.ArticleFlow --migrations-path priv/repo/migrations
## Options
* `--migrations-path` - Path to the migrations directory.
Defaults to `priv/repo/migrations`.
## Generated SQL
The migration executes SQL statements that:
1. Create the flow record and PGMQ queue
2. Add each step with its dependencies and configuration
Example generated migration:
defmodule MyApp.Repo.Migrations.CompileArticleFlow do
use Ecto.Migration
def up do
execute "SELECT pgflow.create_flow('article_flow', 3, 5, 120)"
execute "SELECT pgflow.add_step('article_flow', 'fetch_article', ARRAY[]::text[], NULL, NULL, NULL, NULL, 'single')"
execute "SELECT pgflow.add_step('article_flow', 'summarize', ARRAY['fetch_article']::text[], NULL, NULL, NULL, NULL, 'single')"
end
def down do
execute "DELETE FROM pgflow.deps WHERE flow_slug = 'article_flow'"
execute "DELETE FROM pgflow.steps WHERE flow_slug = 'article_flow'"
execute "DELETE FROM pgflow.flows WHERE flow_slug = 'article_flow'"
execute "SELECT pgmq.drop_queue('article_flow')"
end
end
## Requirements
The flow module must:
1. Use `PgFlow.Flow`
2. Define a valid flow with `@flow` and at least one step
3. Be compilable (no syntax errors)
## Example
# Define a flow
defmodule MyApp.Flows.ArticleFlow do
use PgFlow.Flow
@flow slug: :article_flow, max_attempts: 3
step :fetch do
fn input, _ctx -> %{data: input} end
end
step :process, depends_on: [:fetch] do
fn deps, _ctx -> %{result: deps.fetch} end
end
end
# Generate the migration
$ mix pgflow.gen.flow_migration MyApp.Flows.ArticleFlow
# Run the migration
$ mix ecto.migrate
"""
use Mix.Task
alias Mix.Tasks.Pgflow.Helpers
alias PgFlow.Flow.Definition
alias PgFlow.FlowCompiler
@impl Mix.Task
def run(args) do
# Parse arguments
{opts, args, _} =
OptionParser.parse(args,
switches: [migrations_path: :string],
aliases: [p: :migrations_path]
)
# Validate we have a module name
case args do
[module_string] ->
generate_migration(module_string, opts)
[] ->
Mix.raise("""
Missing flow module argument.
Usage: mix pgflow.gen.flow_migration MyApp.Flows.MyFlow
""")
_ ->
Mix.raise("""
Too many arguments provided.
Usage: mix pgflow.gen.flow_migration MyApp.Flows.MyFlow
""")
end
end
defp generate_migration(module_string, opts) do
# Ensure the application is loaded so we can access flow modules
Mix.Task.run("compile", [])
# Convert string to module
module = String.to_atom("Elixir.#{module_string}")
# Verify the module exists and has the required function
unless Code.ensure_loaded?(module) do
Mix.raise("""
Module #{module_string} could not be loaded.
Make sure the module exists and the project compiles successfully.
""")
end
unless function_exported?(module, :__pgflow_definition__, 0) do
Mix.raise("""
Module #{module_string} is not a PgFlow flow.
The module must use PgFlow.Flow and define at least one step.
Example:
defmodule #{module_string} do
use PgFlow.Flow
@flow slug: :my_flow, max_attempts: 3
step :my_step do
fn input, _ctx -> %{result: input} end
end
end
""")
end
# Get the flow definition
definition = module.__pgflow_definition__()
# Validate the definition
case Definition.validate(definition) do
{:ok, _} ->
:ok
{:error, reason} ->
Mix.raise("""
Flow definition validation failed: #{reason}
Please fix the flow definition and try again.
""")
end
# Generate SQL statements
sql_statements = FlowCompiler.compile(definition)
# Check if flow has cron configured
has_cron = FlowCompiler.has_cron?(module)
# Determine migrations path
migrations_path = Keyword.get(opts, :migrations_path, "priv/repo/migrations")
# Create migrations directory if it doesn't exist
File.mkdir_p!(migrations_path)
# Generate timestamp for migration filename
timestamp = Helpers.generate_timestamp()
# Generate migration module name from flow slug
flow_slug = Atom.to_string(definition.slug)
migration_module = "Compile#{Helpers.camelize(flow_slug)}"
# Generate the migration content
migration_content =
generate_migration_content(migration_module, flow_slug, sql_statements, has_cron)
# Write the migration file
filename = "#{timestamp}_compile_#{flow_slug}.exs"
filepath = Path.join(migrations_path, filename)
File.write!(filepath, migration_content)
Mix.shell().info("""
Generated migration: #{filepath}
Run the migration with:
mix ecto.migrate
This will:
1. Create the '#{flow_slug}' flow in pgflow.flows
2. Create the PGMQ queue 'pgmq.q_#{flow_slug}'
3. Register all #{length(definition.steps)} step(s)
After migration, your worker can process tasks from this flow.
""")
end
defp generate_migration_content(migration_module, flow_slug, sql_statements, has_cron) do
up_statements = format_execute_statements(sql_statements)
unschedule_sql = if has_cron, do: FlowCompiler.cron_unschedule_sql(String.to_atom(flow_slug))
down_statements = Helpers.build_down_statements(flow_slug, has_cron, unschedule_sql)
"""
defmodule PgFlow.Repo.Migrations.#{migration_module} do
@moduledoc \"\"\"
Compiles the '#{flow_slug}' flow definition into the database.
This migration creates:
- The flow record in pgflow.flows
- The PGMQ queue for this flow
- All step definitions in pgflow.steps
Generated by: mix pgflow.gen.flow_migration
\"\"\"
use Ecto.Migration
def up do
#{up_statements}
end
def down do
#{down_statements}
end
end
"""
end
defp format_execute_statements(sql_statements) do
Enum.map_join(sql_statements, "\n", fn sql ->
escaped = String.replace(sql, "\"", "\\\"")
~s( execute "#{escaped}")
end)
end
end