defmodule Mix.Tasks.Pgflow.Gen.JobMigration do
@shortdoc "Generates an Ecto migration to compile a PgFlow job"
@moduledoc """
Generates an Ecto migration that registers a PgFlow job in the database.
Jobs are single-step flows under the hood. This task generates migration SQL
that creates the flow, adds the step, and marks it as `flow_type = 'job'`.
## Usage
mix pgflow.gen.job_migration MyApp.Jobs.SendEmail
mix pgflow.gen.job_migration MyApp.Jobs.SendEmail --migrations-path priv/repo/migrations
## Options
* `--migrations-path` - Path to the migrations directory.
Defaults to `priv/repo/migrations`.
## Generated SQL
The migration executes SQL statements that:
1. Create the flow record and PGMQ queue
2. Add the step (slug defaults to the job queue name, or an explicit name from `perform :name do`)
3. Mark the flow as `flow_type = 'job'`
Example generated migration:
defmodule MyApp.Repo.Migrations.CompileSendEmail do
use Ecto.Migration
def up do
execute "SELECT pgflow.create_flow('send_email', 3, 5, 60)"
execute "SELECT pgflow.add_step('send_email', 'send_email', ARRAY[]::text[], NULL, NULL, NULL, NULL, 'single')"
execute "UPDATE pgflow.flows SET flow_type = 'job' WHERE flow_slug = 'send_email'"
end
def down do
execute "DELETE FROM pgflow.deps WHERE flow_slug = 'send_email'"
execute "DELETE FROM pgflow.steps WHERE flow_slug = 'send_email'"
execute "DELETE FROM pgflow.flows WHERE flow_slug = 'send_email'"
execute "SELECT pgmq.drop_queue('send_email')"
end
end
## Requirements
The job module must:
1. Use `PgFlow.Job`
2. Define a valid job with `@job` and a `perform` block
3. Be compilable (no syntax errors)
"""
use Mix.Task
alias Mix.Tasks.Pgflow.Helpers
alias PgFlow.Flow.Definition
alias PgFlow.JobCompiler
@impl Mix.Task
def run(args) do
{opts, args, _} =
OptionParser.parse(args,
switches: [migrations_path: :string],
aliases: [p: :migrations_path]
)
case args do
[module_string] ->
generate_migration(module_string, opts)
[] ->
Mix.raise("""
Missing job module argument.
Usage: mix pgflow.gen.job_migration MyApp.Jobs.MyJob
""")
_ ->
Mix.raise("""
Too many arguments provided.
Usage: mix pgflow.gen.job_migration MyApp.Jobs.MyJob
""")
end
end
defp generate_migration(module_string, opts) do
Mix.Task.run("compile", [])
module = String.to_atom("Elixir.#{module_string}")
unless Code.ensure_loaded?(module) do
Mix.raise("""
Module #{module_string} could not be loaded.
Make sure the module exists and the project compiles successfully.
""")
end
unless function_exported?(module, :__pgflow_definition__, 0) do
Mix.raise("""
Module #{module_string} is not a PgFlow job.
The module must use PgFlow.Job and define a perform block.
Example:
defmodule #{module_string} do
use PgFlow.Job
@job slug: :my_job, max_attempts: 3
perform :my_step do
fn input, _ctx -> %{result: input} end
end
end
""")
end
definition = module.__pgflow_definition__()
unless definition.flow_type == :job do
Mix.raise("""
Module #{module_string} is not a PgFlow job (it appears to be a flow).
Use `mix pgflow.gen.flow_migration` for flow modules, or make sure this module uses `PgFlow.Job`.
""")
end
case Definition.validate(definition) do
{:ok, _} ->
:ok
{:error, reason} ->
Mix.raise("""
Job definition validation failed: #{reason}
Please fix the job definition and try again.
""")
end
sql_statements = JobCompiler.compile(definition)
# Check if job has cron configured
has_cron = JobCompiler.has_cron?(module)
migrations_path = Keyword.get(opts, :migrations_path, "priv/repo/migrations")
File.mkdir_p!(migrations_path)
timestamp = Helpers.generate_timestamp()
job_slug = Atom.to_string(definition.slug)
migration_module = "Compile#{Helpers.camelize(job_slug)}"
migration_content =
generate_migration_content(migration_module, job_slug, sql_statements, has_cron)
filename = "#{timestamp}_compile_#{job_slug}.exs"
filepath = Path.join(migrations_path, filename)
File.write!(filepath, migration_content)
Mix.shell().info("""
Generated migration: #{filepath}
Run the migration with:
mix ecto.migrate
This will:
1. Create the '#{job_slug}' job as a flow in pgflow.flows
2. Create the PGMQ queue 'pgmq.q_#{job_slug}'
3. Register the step definition
4. Mark flow_type = 'job' for dashboard differentiation
After migration, your worker can process tasks from this job.
""")
end
defp generate_migration_content(migration_module, job_slug, sql_statements, has_cron) do
up_statements = format_execute_statements(sql_statements)
unschedule_sql = if has_cron, do: JobCompiler.cron_unschedule_sql(String.to_atom(job_slug))
down_statements = Helpers.build_down_statements(job_slug, has_cron, unschedule_sql)
"""
defmodule PgFlow.Repo.Migrations.#{migration_module} do
@moduledoc \"\"\"
Compiles the '#{job_slug}' job definition into the database.
This migration creates:
- The flow record in pgflow.flows (with flow_type = 'job')
- The PGMQ queue for this job
- The step definition
Generated by: mix pgflow.gen.job_migration
\"\"\"
use Ecto.Migration
def up do
#{up_statements}
end
def down do
#{down_statements}
end
end
"""
end
defp format_execute_statements(sql_statements) do
Enum.map_join(sql_statements, "\n", fn sql ->
escaped = String.replace(sql, "\"", "\\\"")
~s( execute "#{escaped}")
end)
end
end