lib/pgflow/worker/server.ex

defmodule PgFlow.Worker.Server do
  @moduledoc """
  GenServer that polls pgmq and executes flow tasks.

  Each worker is responsible for a single flow/queue and can execute
  multiple tasks concurrently up to the configured limit.

  Implements the two-phase protocol:
  1. pgmq.read() - Reserve messages from pgmq (non-blocking)
  2. start_tasks() - Create step_tasks records and get task details

  ## Signal Strategies

  The worker supports two signal strategies for detecting new messages:

    * `:polling` - Adaptive jittered exponential backoff (1s → 5s).
      Polls fast when busy, backs off when idle.
    * `:notify` - LISTEN/NOTIFY via pgmq's `enable_notify_insert`.
      Near-instant wake-ups with a 30s fallback poll.

  ## Configuration

  Workers are configured via the `config` parameter passed to `start_link/1`:

      config = %{
        flow_module: MyApp.Flows.ProcessOrder,
        repo: MyApp.Repo,
        worker_id: "550e8400-e29b-41d4-a716-446655440000",
        max_concurrency: 10,
        batch_size: 10,
        signal_strategy: :polling,
        min_poll_interval: 1_000,
        max_poll_interval: 5_000,
        notify_fallback_interval: 30_000
      }

  ## State Structure

  The worker maintains the following state:

    * `flow_module` - The flow module being processed
    * `flow_slug` - String slug for the queue name
    * `worker_id` - UUID for this worker (string format)
    * `repo` - Ecto repo module
    * `task_supervisor` - PID of Task.Supervisor for async execution
    * `active_tasks` - Map of task_ref => task_metadata (includes timeout_timer_ref)
    * `max_concurrency` - Max parallel tasks (default: 10)
    * `batch_size` - Messages per poll (default: 10)
    * `visibility_timeout` - Seconds for message visibility (derived from flow's opt_timeout)
    * `signal_strategy` - `:polling` or `:notify`
    * `signal_state` - Adaptive backoff state for `:polling` strategy
    * `lifecycle` - Worker lifecycle state machine (see `PgFlow.Worker.Lifecycle`)

  ## Lifecycle

  1. **Initialization** - Worker registers itself in the database, starts polling loop
  2. **Polling** - Continuously polls the queue, dispatches tasks to Task.Supervisor
  3. **Task Execution** - Tasks run concurrently, worker tracks completion/failure
  4. **Graceful Shutdown** - Worker stops accepting tasks, waits for active tasks
  5. **Cleanup** - Marks worker as stopped in database

  ## Telemetry Events

  The worker emits the following telemetry events:

    * `[:pgflow, :worker, :start]` - Worker started
    * `[:pgflow, :worker, :stop]` - Worker stopped
    * `[:pgflow, :worker, :poll, :start]` - Poll cycle started
    * `[:pgflow, :worker, :poll, :stop]` - Poll cycle completed
    * `[:pgflow, :worker, :task, :start]` - Task execution started
    * `[:pgflow, :worker, :task, :stop]` - Task execution completed
    * `[:pgflow, :worker, :task, :exception]` - Task execution failed

  """

  use GenServer
  require Logger

  alias PgFlow.Context
  alias PgFlow.Logger, as: PgLogger
  alias PgFlow.Queries.Flows
  alias PgFlow.Queries.Workers, as: WorkerQueries
  alias PgFlow.Worker.Lifecycle

  @type task_metadata :: %{
          run_id: String.t(),
          step_slug: String.t(),
          task_index: non_neg_integer(),
          msg_id: pos_integer(),
          timeout_timer_ref: reference() | nil,
          task_pid: pid() | nil
        }

  @type signal_state :: %{
          current_interval: pos_integer(),
          min_interval: pos_integer(),
          max_interval: pos_integer(),
          poll_timer_ref: reference() | nil
        }

  @type state :: %{
          flow_module: module(),
          flow_slug: String.t(),
          worker_id: String.t(),
          worker_name: String.t(),
          repo: module(),
          task_supervisor: pid(),
          active_tasks: %{reference() => task_metadata()},
          max_concurrency: pos_integer(),
          batch_size: pos_integer(),
          visibility_timeout: pos_integer(),
          signal_strategy: :polling | :notify,
          signal_state: signal_state(),
          notify_fallback_interval: pos_integer(),
          fallback_timer_ref: reference() | nil,
          flow_def: term(),
          lifecycle: Lifecycle.t()
        }

  # Client API

  @doc """
  Starts a worker GenServer.

  ## Options

    * `:flow_module` - (required) The flow module to process
    * `:repo` - (required) The Ecto repository module
    * `:worker_id` - (optional) UUID string for worker identification (generated if not provided)
    * `:task_supervisor` - (optional) PID of Task.Supervisor (uses PgFlow.TaskSupervisor if not provided)
    * `:max_concurrency` - (optional) Maximum concurrent tasks (default: 10)
    * `:batch_size` - (optional) Messages to fetch per poll (default: 10)
    * `:signal_strategy` - (optional) Signal strategy, `:polling` or `:notify` (default: `:polling`)
    * `:min_poll_interval` - (optional) Minimum ms between polls (default: 1000)
    * `:max_poll_interval` - (optional) Maximum ms between polls (default: 5000)
    * `:notify_fallback_interval` - (optional) Fallback poll interval for `:notify` strategy (default: 30000)

  """
  @spec start_link(map()) :: GenServer.on_start()
  def start_link(config) do
    GenServer.start_link(__MODULE__, config)
  end

  @doc """
  Gracefully stops the worker.

  The worker will stop accepting new tasks and wait for active tasks to complete.
  """
  @spec stop(pid()) :: :ok
  def stop(pid) do
    GenServer.call(pid, :stop, :infinity)
  end

  @doc """
  Returns the current state of the worker for debugging.
  """
  @spec get_state(pid()) :: map()
  def get_state(pid) do
    GenServer.call(pid, :get_state)
  end

  # Server Callbacks

  @impl true
  def init(config) do
    # Validate required config
    flow_module = Map.fetch!(config, :flow_module)
    repo = Map.fetch!(config, :repo)

    # Get flow definition
    flow_def = flow_module.__pgflow_definition__()
    flow_slug = Atom.to_string(flow_def.slug)

    # Derive visibility_timeout from flow's opt_timeout (default: 60s)
    visibility_timeout = Keyword.get(flow_def.opts, :timeout, 60)

    # Signal strategy config
    signal_strategy = Map.get(config, :signal_strategy, :polling)
    min_poll_interval = Map.get(config, :min_poll_interval, 1_000)
    max_poll_interval = Map.get(config, :max_poll_interval, 5_000)
    notify_fallback_interval = Map.get(config, :notify_fallback_interval, 30_000)

    # Generate or use provided worker_id
    worker_id = Map.get(config, :worker_id, Ecto.UUID.generate())

    # Generate or use provided worker_name (human-readable identifier for logs)
    worker_name = Map.get(config, :worker_name) || "pgflow-#{flow_slug}"

    # Get or default task supervisor
    task_supervisor = Map.get(config, :task_supervisor, Process.whereis(PgFlow.TaskSupervisor))

    with task_supervisor when is_pid(task_supervisor) <- task_supervisor,
         :ok <- validate_flow_exists(repo, flow_slug, flow_module) do
      # Build signal state for adaptive backoff
      signal_state = %{
        current_interval: min_poll_interval,
        min_interval: min_poll_interval,
        max_interval: max_poll_interval,
        poll_timer_ref: nil
      }

      # Build state
      state = %{
        flow_module: flow_module,
        flow_slug: flow_slug,
        worker_id: worker_id,
        worker_name: worker_name,
        repo: repo,
        task_supervisor: task_supervisor,
        active_tasks: %{},
        max_concurrency: Map.fetch!(config, :max_concurrency),
        batch_size: Map.fetch!(config, :batch_size),
        visibility_timeout: visibility_timeout,
        signal_strategy: signal_strategy,
        signal_state: signal_state,
        notify_fallback_interval: notify_fallback_interval,
        fallback_timer_ref: nil,
        flow_def: flow_def,
        lifecycle:
          Lifecycle.new() |> Lifecycle.transition!(:starting) |> Lifecycle.transition!(:running)
      }

      # Register worker in database
      case register_worker(state) do
        {:ok, _} ->
          # Log startup banner with flow compilation status
          PgLogger.startup_banner(%{
            worker_name: worker_name,
            worker_id: worker_id,
            queue_name: flow_slug,
            flows: [%{flow_slug: flow_slug, status: :ready}]
          })

          # Emit telemetry event
          emit_telemetry([:worker, :start], %{}, %{
            worker_id: worker_id,
            worker_name: worker_name,
            flow_slug: flow_slug
          })

          # Start signal loop based on strategy
          state = schedule_initial_poll(state)

          {:ok, state}

        {:error, reason} ->
          Logger.error("Failed to register worker #{worker_id}: #{inspect(reason)}")
          {:stop, {:registration_failed, reason}}
      end
    else
      nil -> {:stop, :task_supervisor_not_found}
    end
  end

  @impl true
  def handle_info(:poll, %{lifecycle: lifecycle} = state) do
    if Lifecycle.can_accept_work?(lifecycle) do
      state = do_poll_cycle(state)
      {:noreply, state}
    else
      {:noreply, state}
    end
  end

  @impl true
  def handle_info(:poll_now, %{lifecycle: lifecycle} = state) do
    if Lifecycle.can_accept_work?(lifecycle) do
      # Cancel any pending poll timer to avoid double-polling
      state = cancel_poll_timer(state)
      # Reset fallback timer since we got a NOTIFY (for :notify strategy)
      state = reset_fallback_timer(state)
      state = do_poll_cycle(state)
      {:noreply, state}
    else
      {:noreply, state}
    end
  end

  @impl true
  def handle_info(:fallback_poll, %{lifecycle: lifecycle} = state) do
    if Lifecycle.can_accept_work?(lifecycle) do
      state = do_poll_cycle(state)
      # Reschedule the fallback timer and track the ref
      fallback_ref = Process.send_after(self(), :fallback_poll, state.notify_fallback_interval)
      {:noreply, %{state | fallback_timer_ref: fallback_ref}}
    else
      {:noreply, state}
    end
  end

  @impl true
  def handle_info({ref, result}, state) when is_reference(ref) do
    # Task completed successfully
    case Map.pop(state.active_tasks, ref) do
      {nil, _} ->
        # Unknown task reference, ignore
        {:noreply, state}

      {task_meta, new_active_tasks} ->
        # Cancel the timeout timer
        cancel_task_timeout(task_meta)
        handle_task_success(task_meta, result, state)
        state = %{state | active_tasks: new_active_tasks}

        # Only poll if the completed step has downstream dependents.
        # This optimizes the :notify strategy by avoiding unnecessary polls
        # when a terminal step completes (no downstream work to pick up).
        # For steps with dependents, we poll immediately because:
        # 1. complete_task may enqueue downstream tasks via start_ready_steps
        # 2. The NOTIFY for those inserts may be throttled (pgmq throttle_interval_ms)
        # 3. Without immediate poll, worker would wait for fallback_poll (30s)
        has_dependents = step_has_dependents?(state.flow_module, task_meta.step_slug)

        state =
          if has_dependents and Lifecycle.can_accept_work?(state.lifecycle) do
            schedule_immediate_poll(state)
          else
            state
          end

        {:noreply, state}
    end
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _pid, reason}, state) do
    # Task crashed or exited
    case Map.pop(state.active_tasks, ref) do
      {nil, _} ->
        # Unknown task reference, ignore
        {:noreply, state}

      {task_meta, new_active_tasks} ->
        # Cancel the timeout timer
        cancel_task_timeout(task_meta)
        handle_task_failure(task_meta, reason, state)
        state = %{state | active_tasks: new_active_tasks}

        # For failures, always poll if we can accept work - retries get re-queued
        # and we want to pick them up promptly
        state =
          if Lifecycle.can_accept_work?(state.lifecycle) do
            schedule_immediate_poll(state)
          else
            state
          end

        {:noreply, state}
    end
  end

  @impl true
  def handle_info({:task_timeout, ref}, state) do
    case Map.pop(state.active_tasks, ref) do
      {nil, _} ->
        # Task already completed, ignore
        {:noreply, state}

      {task_meta, new_active_tasks} ->
        # Terminate the task process gracefully
        if task_meta.task_pid do
          Task.Supervisor.terminate_child(state.task_supervisor, task_meta.task_pid)
        end

        # Resolve the timeout value for the error message
        timeout_seconds = resolve_task_timeout(state, task_meta.step_slug)

        # Report failure
        handle_task_failure(
          task_meta,
          "Task timed out after #{timeout_seconds}s",
          state
        )

        {:noreply, %{state | active_tasks: new_active_tasks}}
    end
  end

  @impl true
  def handle_call(:stop, _from, state) do
    {:stop, :normal, :ok, do_stop(state)}
  end

  @impl true
  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end

  # Private function to handle graceful stop
  defp do_stop(state) do
    # Transition to stopping state
    lifecycle = Lifecycle.transition!(state.lifecycle, :stopping)
    state = %{state | lifecycle: lifecycle}

    # Log waiting phase if there are active tasks
    if map_size(state.active_tasks) > 0 do
      PgLogger.shutdown(state.worker_name, :waiting)
    end

    # Wait for all active tasks to complete
    wait_for_tasks(state)

    # Mark worker as stopped in database
    mark_worker_stopped(state)

    # Transition to stopped state
    lifecycle = Lifecycle.transition!(state.lifecycle, :stopped)
    state = %{state | lifecycle: lifecycle}

    # Log stopped phase
    PgLogger.shutdown(state.worker_name, :stopped)

    # Emit telemetry event
    emit_telemetry([:worker, :stop], %{}, %{
      worker_id: state.worker_id,
      worker_name: state.worker_name,
      flow_slug: state.flow_slug
    })

    state
  end

  @impl true
  def terminate(reason, state) do
    Logger.debug("Worker #{state.worker_id} terminating: #{inspect(reason)}")

    # Only mark stopped if do_stop hasn't already done it
    unless Lifecycle.stopped?(state.lifecycle) do
      mark_worker_stopped(state)
    end

    :ok
  end

  # Private Functions

  @spec register_worker(state()) :: {:ok, term()} | {:error, term()}
  defp register_worker(state) do
    function_name = "elixir:#{state.flow_module}"
    WorkerQueries.register_worker(state.repo, state.worker_id, state.flow_slug, function_name)
  end

  @spec mark_worker_stopped(state()) :: :ok
  defp mark_worker_stopped(state) do
    case WorkerQueries.mark_worker_stopped(state.repo, state.worker_id) do
      {:ok, _} ->
        :ok

      {:error, reason} ->
        Logger.warning("Failed to mark worker as stopped: #{inspect(reason)}")
        :ok
    end
  end

  defp validate_flow_exists(repo, flow_slug, flow_module) do
    case Flows.flow_exists?(repo, flow_slug) do
      {:ok, true} ->
        :ok

      {:ok, false} ->
        raise """
        Flow "#{flow_slug}" is not compiled in the database.

        The PGMQ queue for this flow does not exist. Run the migration to compile it:

            mix pgflow.gen.flow_migration #{inspect(flow_module)}
            mix ecto.migrate

        Or if you haven't created the migration yet, generate it first.
        """

      {:error, error} ->
        Logger.warning("Failed to check if flow exists: #{inspect(error)}")
        :ok
    end
  end

  # Schedules the initial poll based on signal strategy
  defp schedule_initial_poll(state) do
    case state.signal_strategy do
      :polling ->
        schedule_next_poll(state, :initial)

      :notify ->
        # For notify strategy, start the fallback timer and do an initial poll
        fallback_ref = Process.send_after(self(), :fallback_poll, state.notify_fallback_interval)
        state = %{state | fallback_timer_ref: fallback_ref}
        # Do an immediate poll to pick up any existing messages
        ref = Process.send_after(self(), :poll, 0)
        put_in(state, [:signal_state, :poll_timer_ref], ref)
    end
  end

  # Schedules the next poll with jittered exponential backoff
  @spec schedule_next_poll(state(), :found_messages | :empty | :initial) :: state()
  defp schedule_next_poll(state, poll_result) do
    signal_state = state.signal_state

    next_interval =
      case poll_result do
        :found_messages ->
          # Reset to fast polling
          signal_state.min_interval

        :initial ->
          # Start with minimum interval
          signal_state.min_interval

        :empty ->
          # Decorrelated jitter: rand_uniform(min, prev * 3), capped at max
          max_jitter = min(signal_state.current_interval * 3, signal_state.max_interval)
          Enum.random(signal_state.min_interval..max(signal_state.min_interval, max_jitter))
      end

    ref = Process.send_after(self(), :poll, next_interval)

    signal_state = %{signal_state | current_interval: next_interval, poll_timer_ref: ref}
    %{state | signal_state: signal_state}
  end

  # Schedules an immediate poll (used when capacity frees up)
  defp schedule_immediate_poll(state) do
    state = cancel_poll_timer(state)
    signal_state = %{state.signal_state | current_interval: state.signal_state.min_interval}
    ref = Process.send_after(self(), :poll, 0)
    %{state | signal_state: %{signal_state | poll_timer_ref: ref}}
  end

  # Cancels the pending poll timer if one exists and flushes any already-sent message
  @spec cancel_poll_timer(state()) :: state()
  defp cancel_poll_timer(state) do
    case state.signal_state.poll_timer_ref do
      nil ->
        state

      ref ->
        Process.cancel_timer(ref)

        # Flush any already-sent :poll message from mailbox
        receive do
          :poll -> :ok
        after
          0 -> :ok
        end

        put_in(state, [:signal_state, :poll_timer_ref], nil)
    end
  end

  # Resets the fallback timer when a NOTIFY is received (for :notify strategy only)
  # This reduces unnecessary polling when NOTIFY is working properly
  @spec reset_fallback_timer(state()) :: state()
  defp reset_fallback_timer(%{signal_strategy: :notify, fallback_timer_ref: ref} = state) do
    if ref do
      Process.cancel_timer(ref)

      # Flush any already-sent :fallback_poll message from mailbox
      receive do
        :fallback_poll -> :ok
      after
        0 -> :ok
      end
    end

    new_ref = Process.send_after(self(), :fallback_poll, state.notify_fallback_interval)
    %{state | fallback_timer_ref: new_ref}
  end

  defp reset_fallback_timer(state), do: state

  # Executes a full poll cycle: read, dispatch, schedule next
  defp do_poll_cycle(state) do
    start_time = System.monotonic_time()

    emit_telemetry([:worker, :poll, :start], %{}, %{
      worker_id: state.worker_id,
      flow_slug: state.flow_slug,
      active_tasks: map_size(state.active_tasks)
    })

    {state, poll_result} = poll_and_dispatch(state)

    duration = System.monotonic_time() - start_time

    emit_telemetry([:worker, :poll, :stop], %{duration: duration}, %{
      worker_id: state.worker_id,
      flow_slug: state.flow_slug,
      active_tasks: map_size(state.active_tasks)
    })

    # Schedule next poll based on strategy
    case state.signal_strategy do
      :polling ->
        schedule_next_poll(state, poll_result)

      :notify ->
        # For notify strategy, don't schedule another poll — wait for notification
        # (the fallback timer handles the safety net)
        state
    end
  end

  @spec poll_and_dispatch(state()) :: {state(), :found_messages | :empty}
  defp poll_and_dispatch(state) do
    # Calculate how many tasks we can accept
    available_slots = state.max_concurrency - map_size(state.active_tasks)

    if available_slots <= 0 do
      # At capacity, don't poll
      {state, :empty}
    else
      # Log polling activity
      PgLogger.polling(state.worker_name)

      # Read messages (non-blocking, limited by available slots and batch size)
      batch_size = min(available_slots, state.batch_size)

      case Flows.read(
             state.repo,
             state.flow_slug,
             state.visibility_timeout,
             batch_size
           ) do
        {:ok, []} ->
          PgLogger.task_count(state.worker_name, 0)
          {state, :empty}

        {:ok, messages} ->
          PgLogger.task_count(state.worker_name, length(messages))
          state = start_and_dispatch_tasks(state, messages)
          {state, :found_messages}

        {:error, reason} ->
          Logger.error("Failed to poll queue #{state.flow_slug}: #{inspect(reason)}")
          {state, :empty}
      end
    end
  end

  @spec start_and_dispatch_tasks(state(), list(list())) :: state()
  defp start_and_dispatch_tasks(state, messages) do
    # Extract message IDs
    msg_ids = Enum.map(messages, fn [msg_id | _] -> msg_id end)

    # Call start_tasks to create step_tasks records and get task details
    case Flows.start_tasks(state.repo, state.flow_slug, msg_ids, state.worker_id) do
      {:ok, task_details} ->
        # Dispatch each task
        Enum.reduce(task_details, state, fn task_detail, acc_state ->
          dispatch_task(acc_state, task_detail)
        end)

      {:error, reason} ->
        Logger.error("Failed to start tasks for flow #{state.flow_slug}: #{inspect(reason)}")
        state
    end
  end

  @spec dispatch_task(state(), list()) :: state()
  defp dispatch_task(state, task_detail) do
    # Parse task detail row from pgflow.start_tasks:
    # [flow_slug, run_id (binary UUID), step_slug, input, msg_id, task_index, flow_input]
    # - input: step-specific input (raw element for map, {} for root, deps for dependent)
    # - flow_input: original flow input (only for root non-map steps, NULL otherwise)
    [_flow_slug, run_id_bin, step_slug, input, msg_id, task_index, flow_input] = task_detail

    # Convert binary UUID to string format
    run_id = Ecto.UUID.load!(run_id_bin)

    # Get step definition to determine input routing
    step_slug_atom = String.to_atom(step_slug)
    step_def = get_step_definition(state.flow_module, step_slug_atom)

    unless step_def do
      defined_steps =
        state.flow_module.__pgflow_definition__().steps
        |> Enum.map(& &1.slug)

      raise """
      Step #{inspect(step_slug_atom)} not found in #{inspect(state.flow_module)} definition.

      Steps defined in module: #{inspect(defined_steps)}
      Step slug (queue) from database: #{inspect(step_slug)}

      This usually means the database schema is out of sync with your Elixir code.
      Re-run the migration to sync: mix ecto.migrate
      """
    end

    # Parse inputs - Postgrex returns JSONB as maps/lists/primitives
    input_data = decode_json_if_needed(input)
    flow_input_data = decode_json_if_needed(flow_input)

    # Route input based on step type (matching TypeScript reference pattern):
    # - Map steps: receive raw array element directly
    # - Root steps (no deps): receive flow_input directly
    # - Dependent steps: receive deps object {dep1: val1, dep2: val2, ...}
    handler_input = route_handler_input(step_def, input_data, flow_input_data)

    # Get handler function from flow module
    handler = state.flow_module.__pgflow_handler__(step_slug_atom)

    # Build context struct with flow_input available for lazy access
    context = %Context{
      run_id: run_id,
      step_slug: step_slug_atom,
      task_index: task_index,
      attempt: 1,
      repo: state.repo,
      flow_input: flow_input_data || :not_loaded
    }

    # Start task under supervisor
    task =
      Task.Supervisor.async_nolink(state.task_supervisor, fn ->
        start_time = System.monotonic_time()

        emit_telemetry([:worker, :task, :start], %{}, %{
          worker_id: state.worker_id,
          flow_slug: state.flow_slug,
          run_id: run_id,
          step_slug: step_slug,
          task_index: task_index
        })

        try do
          result = handler.(handler_input, context)
          duration = System.monotonic_time() - start_time

          emit_telemetry([:worker, :task, :stop], %{duration: duration}, %{
            worker_id: state.worker_id,
            flow_slug: state.flow_slug,
            run_id: run_id,
            step_slug: step_slug,
            task_index: task_index,
            output: result
          })

          {:ok, result}
        catch
          kind, reason ->
            duration = System.monotonic_time() - start_time
            stacktrace = __STACKTRACE__

            emit_telemetry([:worker, :task, :exception], %{duration: duration}, %{
              worker_id: state.worker_id,
              flow_slug: state.flow_slug,
              run_id: run_id,
              step_slug: step_slug,
              task_index: task_index,
              kind: kind,
              reason: reason
            })

            {:error, Exception.format(kind, reason, stacktrace)}
        end
      end)

    # Schedule task timeout
    timeout_ms = resolve_task_timeout(state, step_slug) * 1_000
    timeout_timer_ref = Process.send_after(self(), {:task_timeout, task.ref}, timeout_ms)

    # Track task with timeout timer and pid
    task_meta = %{
      run_id: run_id,
      step_slug: step_slug,
      task_index: task_index,
      msg_id: msg_id,
      timeout_timer_ref: timeout_timer_ref,
      task_pid: task.pid
    }

    active_tasks = Map.put(state.active_tasks, task.ref, task_meta)
    %{state | active_tasks: active_tasks}
  end

  # Resolves the timeout for a task: step-level override > flow-level > default 60s
  defp resolve_task_timeout(state, step_slug) do
    step_slug_atom =
      if is_atom(step_slug), do: step_slug, else: String.to_atom(step_slug)

    step_def = get_step_definition(state.flow_module, step_slug_atom)
    flow_timeout = Keyword.get(state.flow_def.opts, :timeout, 60)

    if step_def && step_def.timeout, do: step_def.timeout, else: flow_timeout
  end

  # Cancels a task's timeout timer if it exists and flushes any already-sent message
  defp cancel_task_timeout(%{timeout_timer_ref: ref}) when is_reference(ref) do
    Process.cancel_timer(ref)

    receive do
      {:task_timeout, ^ref} -> :ok
    after
      0 -> :ok
    end
  end

  defp cancel_task_timeout(_task_meta), do: :ok

  @spec handle_task_success(task_metadata(), term(), state()) :: :ok
  defp handle_task_success(task_meta, {:ok, output}, state) do
    serialized = serialize_handler_output(output)

    case Flows.complete_task(
           state.repo,
           task_meta.run_id,
           task_meta.step_slug,
           task_meta.task_index,
           serialized
         ) do
      {:ok, _} ->
        maybe_emit_run_completed(state, task_meta.run_id)
        # Delete message from queue ONLY after DB state is confirmed updated.
        # If we delete unconditionally (including on the :error branch below),
        # a transient complete_task failure (deadlock, conn hiccup, serializable
        # rollback) orphans the task: step_tasks row stays un-updated but pgmq
        # message is gone, so no worker can re-pick it up and stalled_recovery
        # can't resurrect it. The step hangs forever waiting on a task that
        # will never run.
        delete_message(state, task_meta.msg_id)

      {:error, reason} ->
        Logger.error(
          "Failed to mark task as completed: #{task_meta.step_slug}[#{task_meta.task_index}] - #{inspect(reason)}"
        )

        # Intentionally DO NOT delete the message. pgmq visibility timeout
        # will expire and the task will be re-delivered. Handlers must be
        # idempotent (the same contract required for pgflow retries on
        # genuine task crashes).
    end
  end

  defp handle_task_success(task_meta, {:error, error_message}, state) do
    handle_task_failure(task_meta, error_message, state)
  end

  defp handle_task_success(task_meta, unexpected_result, state) do
    Logger.warning("Task returned unexpected result format: #{inspect(unexpected_result)}")

    handle_task_failure(
      task_meta,
      "Task returned unexpected result: #{inspect(unexpected_result)}",
      state
    )
  end

  @spec handle_task_failure(task_metadata(), term(), state()) :: :ok
  defp handle_task_failure(task_meta, reason, state) do
    error_message =
      case reason do
        :normal -> "Task exited normally without result"
        :shutdown -> "Task was shut down"
        {:shutdown, _} -> "Task was shut down"
        msg when is_binary(msg) -> msg
        other -> inspect(other)
      end

    # Build logging context
    log_ctx = %{
      worker_name: state.worker_name,
      worker_id: state.worker_id,
      flow_slug: state.flow_slug,
      step_slug: task_meta.step_slug,
      run_id: task_meta.run_id,
      task_index: task_meta.task_index,
      msg_id: task_meta.msg_id
    }

    case Flows.fail_task(
           state.repo,
           task_meta.run_id,
           task_meta.step_slug,
           task_meta.task_index,
           error_message
         ) do
      {:ok, result} ->
        # Extract retry info from the fail_task result if available
        # The fail_task function returns step_task record with attempts_count and max_attempts
        retry_info = extract_retry_info(result, state)
        PgLogger.task_failed(log_ctx, error_message, retry_info)
        maybe_emit_run_failed(state, task_meta.run_id, error_message)

      {:error, fail_reason} ->
        Logger.error(
          "Failed to mark task as failed: #{task_meta.step_slug}[#{task_meta.task_index}] - #{inspect(fail_reason)}"
        )
    end

    # Don't delete message on failure - let it be retried via visibility timeout
    :ok
  end

  # Extract retry information from fail_task result
  # The result format depends on what pgflow.fail_task returns
  defp extract_retry_info(nil, _state), do: nil

  defp extract_retry_info(result, state) when is_list(result) do
    # pgflow.fail_task returns: (flow_slug, run_id, step_slug, task_index, status, attempts_count, ...)
    # We need attempts_count (index 5) and we can get max_attempts from flow definition
    case result do
      [_flow_slug, _run_id, _step_slug, _task_index, _status, attempts_count | _rest]
      when is_integer(attempts_count) ->
        flow_def = state.flow_def
        max_attempts = Keyword.get(flow_def.opts, :max_attempts, 3)
        base_delay = Keyword.get(flow_def.opts, :base_delay, 1)

        # Calculate next retry delay using exponential backoff
        delay_seconds = base_delay * :math.pow(2, attempts_count - 1)

        %{
          attempt: attempts_count,
          max_attempts: max_attempts,
          delay_seconds: Float.round(delay_seconds, 1)
        }

      _ ->
        nil
    end
  end

  defp extract_retry_info(_, _state), do: nil

  @spec delete_message(state(), pos_integer()) :: :ok
  defp delete_message(state, msg_id) do
    case Flows.delete_message(state.repo, state.flow_slug, msg_id) do
      {:ok, _} ->
        :ok

      {:error, reason} ->
        Logger.warning("Failed to delete message #{msg_id}: #{inspect(reason)}")
        :ok
    end
  end

  # Validates handler output is JSON-serializable before storing.
  # Gracefully falls back to inspected output on serialization failure.
  defp serialize_handler_output(nil), do: %{}

  defp serialize_handler_output(output) when is_map(output) or is_list(output) do
    case Jason.encode(output) do
      {:ok, _} ->
        output

      {:error, reason} ->
        Logger.warning("Handler output not JSON-serializable: #{inspect(reason)}")
        %{"_serialization_error" => inspect(reason), "_raw" => inspect(output)}
    end
  end

  defp serialize_handler_output(output), do: %{"_raw" => inspect(output)}

  # Helper to decode JSON only if needed - Postgrex returns JSONB as native Elixir types
  defp decode_json_if_needed(nil), do: nil
  defp decode_json_if_needed(value) when is_map(value), do: value
  defp decode_json_if_needed(value) when is_list(value), do: value
  defp decode_json_if_needed(value) when is_binary(value), do: Jason.decode!(value)
  # Handle primitives (numbers, booleans) that come from JSONB array elements
  defp decode_json_if_needed(value) when is_number(value), do: value
  defp decode_json_if_needed(value) when is_boolean(value), do: value

  # Get step definition from flow module
  defp get_step_definition(flow_module, step_slug) do
    definition = flow_module.__pgflow_definition__()

    Enum.find(definition.steps, fn step ->
      step.slug == step_slug
    end)
  end

  # Check if a step has downstream dependents (other steps that depend on it)
  # Used to determine if we should poll after completing a task - only poll
  # if the completed step might trigger downstream work
  defp step_has_dependents?(flow_module, step_slug) do
    step_slug_atom =
      if is_atom(step_slug), do: step_slug, else: String.to_atom(step_slug)

    definition = flow_module.__pgflow_definition__()

    Enum.any?(definition.steps, fn step ->
      step_slug_atom in step.depends_on
    end)
  end

  # Route handler input based on step type (matching TypeScript reference pattern)
  # See: pgflow-reference/pkgs/edge-worker/src/flow/StepTaskExecutor.ts lines 108-119
  defp route_handler_input(step_def, input_data, flow_input_data) do
    cond do
      # Map steps: receive raw array element directly
      step_def.step_type == :map ->
        input_data

      # Root steps (no dependencies): receive flow_input directly
      Enum.empty?(step_def.depends_on) ->
        flow_input_data

      # Dependent steps: receive deps object {dep1: val1, dep2: val2, ...}
      true ->
        input_data
    end
  end

  @spec wait_for_tasks(state()) :: :ok
  defp wait_for_tasks(%{active_tasks: active_tasks}) when map_size(active_tasks) == 0 do
    :ok
  end

  defp wait_for_tasks(state) do
    receive do
      {ref, result} when is_reference(ref) ->
        # Process the task result during shutdown
        case Map.pop(state.active_tasks, ref) do
          {nil, _} ->
            wait_for_tasks(state)

          {task_meta, new_active_tasks} ->
            cancel_task_timeout(task_meta)
            handle_task_success(task_meta, result, state)
            wait_for_tasks(%{state | active_tasks: new_active_tasks})
        end

      {:DOWN, ref, :process, _pid, reason} ->
        # Process the task failure during shutdown
        case Map.pop(state.active_tasks, ref) do
          {nil, _} ->
            wait_for_tasks(state)

          {task_meta, new_active_tasks} ->
            cancel_task_timeout(task_meta)
            handle_task_failure(task_meta, reason, state)
            wait_for_tasks(%{state | active_tasks: new_active_tasks})
        end

      {:task_timeout, ref} ->
        case Map.pop(state.active_tasks, ref) do
          {nil, _} ->
            # Already completed, ignore
            wait_for_tasks(state)

          {task_meta, new_active_tasks} ->
            # Terminate the task and report failure (same as normal handler)
            if task_meta.task_pid do
              Task.Supervisor.terminate_child(state.task_supervisor, task_meta.task_pid)
            end

            timeout_seconds = resolve_task_timeout(state, task_meta.step_slug)

            handle_task_failure(
              task_meta,
              "Task timed out after #{timeout_seconds}s",
              state
            )

            wait_for_tasks(%{state | active_tasks: new_active_tasks})
        end
    after
      30_000 ->
        Logger.warning(
          "Timeout waiting for tasks to complete, #{map_size(state.active_tasks)} tasks still active"
        )

        :ok
    end
  end

  # Checks run status after task completion and emits run:completed telemetry if the run finished
  defp maybe_emit_run_completed(state, run_id) do
    case Flows.get_run(state.repo, run_id) do
      {:ok, %{status: "completed", output: output}} ->
        :telemetry.execute(
          [:pgflow, :run, :completed],
          %{system_time: System.system_time()},
          %{flow_slug: state.flow_slug, run_id: run_id, output: output}
        )

      _ ->
        :ok
    end
  end

  # Checks run status after task failure and emits run:failed telemetry if the run failed
  defp maybe_emit_run_failed(state, run_id, error) do
    case Flows.get_run(state.repo, run_id) do
      {:ok, %{status: "failed"}} ->
        :telemetry.execute(
          [:pgflow, :run, :failed],
          %{system_time: System.system_time()},
          %{flow_slug: state.flow_slug, run_id: run_id, error: error}
        )

      _ ->
        :ok
    end
  end

  @spec emit_telemetry(list(atom()), map(), map()) :: :ok
  defp emit_telemetry(event_name, measurements, metadata) do
    :telemetry.execute([:pgflow] ++ event_name, measurements, metadata)
  end
end