diff --git a/.agents/scripts/commands/pulse.md b/.agents/scripts/commands/pulse.md index 154d111d8..2a46bc579 100644 --- a/.agents/scripts/commands/pulse.md +++ b/.agents/scripts/commands/pulse.md @@ -558,23 +558,25 @@ sleep 2 If a dispatch attempt exits immediately with provider/auth failure (for example `Token refresh failed`, `authentication`, `401`, `403`, `400` in startup logs), do not wait for next cycle. Re-dispatch in the same cycle via `headless-runtime-helper.sh run` with an explicit alternate model/provider and continue filling remaining slots. -**Launch validation is mandatory (t1452):** after each dispatch, verify the worker actually started and did not fall through to CLI usage output. +**Launch validation is mandatory (t1452/t1453):** after each dispatch, validate the launch with the wrapper helper. This keeps the gate deterministic and aligned with wrapper-side enforcement. ```bash -# 1) Process exists (session-key must map to a live run) -pgrep -fal "issue-|Issue #:" >/dev/null +# Source wrapper helper once per pulse run (safe when sourced) +source ~/.aidevops/agents/scripts/pulse-wrapper.sh -# 2) Startup log is not CLI help/usage output -LOG_PATH="/tmp/pulse--.log" -if [[ -f "$LOG_PATH" ]] && rg -q '^opencode run \[message\.\.]|^run opencode with a message|^Options:' "$LOG_PATH"; then - echo "Invalid worker launch for # (CLI usage output)" +# check_worker_launch returns 0 only when the worker process appears +# and no CLI usage-output markers are detected in known startup logs. +if ! check_worker_launch ; then + echo "Invalid worker launch for #" # Relaunch immediately via helper (never leave this for next pulse) fi ``` If validation fails, re-dispatch immediately via `headless-runtime-helper.sh run`, add a short issue comment noting the failed launch and correction, and continue filling slots. -9. **Fill-to-cap post-condition (t1449):** before ending the pulse cycle, compare active workers vs `MAX_WORKERS`. If below cap and runnable scoped issues/PR work exists in any repo class, continue dispatching until cap is reached or no runnable candidates remain. Do not leave slots idle because of class reservations when one class is PR-capped or empty. +9. **Fill-to-cap post-condition (t1449/t1453):** before ending the pulse cycle, compare active workers vs `MAX_WORKERS`. If below cap and runnable scoped issues/PR work exists in any repo class, continue dispatching until cap is reached or no runnable candidates remain. Do not leave slots idle because of class reservations when one class is PR-capped or empty. + + `pulse-wrapper.sh` now enforces this invariant after the LLM pulse pass via bounded backfill cycles (until max workers or no runnable work) and treats queued issues without live workers as launch-validation failures to backfill immediately. ### Candidate discovery baseline (t1443 + t1448) diff --git a/.agents/scripts/pulse-wrapper.sh b/.agents/scripts/pulse-wrapper.sh index 715d74883..53fab6f84 100755 --- a/.agents/scripts/pulse-wrapper.sh +++ b/.agents/scripts/pulse-wrapper.sh @@ -83,6 +83,8 @@ MAX_WORKERS_CAP="${MAX_WORKERS_CAP:-$(config_get "orchestration.max_workers_cap" DAILY_PR_CAP="${DAILY_PR_CAP:-5}" # Max PRs created per repo per day (GH#3821) PRODUCT_RESERVATION_PCT="${PRODUCT_RESERVATION_PCT:-60}" # % of worker slots reserved for product repos (t1423) QUALITY_DEBT_CAP_PCT="${QUALITY_DEBT_CAP_PCT:-$(config_get "orchestration.quality_debt_cap_pct" "30")}" # % cap for quality-debt dispatch share +PULSE_BACKFILL_MAX_ATTEMPTS="${PULSE_BACKFILL_MAX_ATTEMPTS:-3}" # Additional pulse passes when below utilization target (t1453) +PULSE_LAUNCH_GRACE_SECONDS="${PULSE_LAUNCH_GRACE_SECONDS:-20}" # Grace window for worker process to appear after dispatch (t1453) # Process guard limits (t1398) CHILD_RSS_LIMIT_KB="${CHILD_RSS_LIMIT_KB:-2097152}" # 2 GB default — kill child if RSS exceeds this @@ -106,6 +108,8 @@ QUALITY_DEBT_CAP_PCT=$(_validate_int QUALITY_DEBT_CAP_PCT "$QUALITY_DEBT_CAP_PCT if [[ "$QUALITY_DEBT_CAP_PCT" -gt 100 ]]; then QUALITY_DEBT_CAP_PCT=100 fi +PULSE_BACKFILL_MAX_ATTEMPTS=$(_validate_int PULSE_BACKFILL_MAX_ATTEMPTS "$PULSE_BACKFILL_MAX_ATTEMPTS" 3 0) +PULSE_LAUNCH_GRACE_SECONDS=$(_validate_int PULSE_LAUNCH_GRACE_SECONDS "$PULSE_LAUNCH_GRACE_SECONDS" 20 5) CHILD_RSS_LIMIT_KB=$(_validate_int CHILD_RSS_LIMIT_KB "$CHILD_RSS_LIMIT_KB" 2097152 1) CHILD_RUNTIME_LIMIT=$(_validate_int CHILD_RUNTIME_LIMIT "$CHILD_RUNTIME_LIMIT" 1800 1) SHELLCHECK_RSS_LIMIT_KB=$(_validate_int SHELLCHECK_RSS_LIMIT_KB "$SHELLCHECK_RSS_LIMIT_KB" 1048576 1) @@ -1668,6 +1672,212 @@ prefetch_contribution_watch() { return 0 } +####################################### +# Count active worker processes +# Returns: count via stdout +####################################### +count_active_workers() { + local count + count=$(ps axo command | grep '[/]full-loop' | grep -c '[.]opencode') || count=0 + echo "$count" + return 0 +} + +####################################### +# Get current max workers from pulse-max-workers file +# Returns: numeric value via stdout (defaults to 1) +####################################### +get_max_workers_target() { + local max_workers_file="${HOME}/.aidevops/logs/pulse-max-workers" + local max_workers + max_workers=$(cat "$max_workers_file" 2>/dev/null || echo "1") + [[ "$max_workers" =~ ^[0-9]+$ ]] || max_workers=1 + if [[ "$max_workers" -lt 1 ]]; then + max_workers=1 + fi + echo "$max_workers" + return 0 +} + +####################################### +# Count runnable backlog candidates across pulse scope +# Heuristic for t1453 utilization loop: +# - open unassigned, non-blocked issues +# - open PRs with failing checks or changes requested +# Returns: count via stdout +####################################### +count_runnable_candidates() { + local repos_json="${REPOS_JSON}" + if [[ ! -f "$repos_json" ]] || ! command -v jq &>/dev/null; then + echo "0" + return 0 + fi + + local total=0 + while IFS='|' read -r slug _path; do + [[ -n "$slug" ]] || continue + + local issue_json + issue_json=$(gh issue list --repo "$slug" --state open --json assignees,labels --limit 100 2>/dev/null) || issue_json="[]" + local issue_count + issue_count=$(echo "$issue_json" | jq '[.[] | select((.assignees | length) == 0 and (.labels | map(.name) | index("status:blocked") | not))] | length' 2>/dev/null) || issue_count=0 + [[ "$issue_count" =~ ^[0-9]+$ ]] || issue_count=0 + + local pr_json + pr_json=$(gh pr list --repo "$slug" --state open --json reviewDecision,statusCheckRollup --limit 100 2>/dev/null) || pr_json="[]" + local pr_count + pr_count=$(echo "$pr_json" | jq '[.[] | select(.reviewDecision == "CHANGES_REQUESTED" or ((.statusCheckRollup // []) | any((.conclusion // .state) == "FAILURE")))] | length' 2>/dev/null) || pr_count=0 + [[ "$pr_count" =~ ^[0-9]+$ ]] || pr_count=0 + + total=$((total + issue_count + pr_count)) + done < <(jq -r '.initialized_repos[] | select(.pulse == true and (.local_only // false) == false and .slug != "") | "\(.slug)|\(.path)"' "$repos_json" 2>/dev/null) + + echo "$total" + return 0 +} + +####################################### +# Count queued issues that do not have an active worker process +# This is a launch-validation signal: queued labels imply dispatch, +# but no matching worker indicates startup failure or immediate exit. +# Returns: count via stdout +####################################### +count_queued_without_worker() { + local repos_json="${REPOS_JSON}" + if [[ ! -f "$repos_json" ]] || ! command -v jq &>/dev/null; then + echo "0" + return 0 + fi + + local total=0 + while IFS= read -r slug; do + [[ -n "$slug" ]] || continue + local queued_numbers + queued_numbers=$(gh issue list --repo "$slug" --state open --label "status:queued" --json number --jq '.[].number' --limit 100 2>/dev/null) || queued_numbers="" + if [[ -z "$queued_numbers" ]]; then + continue + fi + + while IFS= read -r issue_num; do + [[ "$issue_num" =~ ^[0-9]+$ ]] || continue + if ! pgrep -fal "issue-${issue_num}|Issue #${issue_num}:" >/dev/null; then + total=$((total + 1)) + fi + done <<<"$queued_numbers" + done < <(jq -r '.initialized_repos[] | select(.pulse == true and (.local_only // false) == false and .slug != "") | .slug' "$repos_json" 2>/dev/null) + + echo "$total" + return 0 +} + +####################################### +# Launch validation gate for pulse dispatches (t1453) +# +# Arguments: +# $1 - issue number +# $2 - repo slug (owner/repo) +# $3 - optional grace timeout in seconds +# +# Exit codes: +# 0 - worker launch appears valid (process observed, no CLI usage output marker) +# 1 - launch invalid (no process within grace window or usage output detected) +####################################### +check_worker_launch() { + local issue_number="$1" + local repo_slug="$2" + local grace_seconds="${3:-$PULSE_LAUNCH_GRACE_SECONDS}" + + if [[ ! "$issue_number" =~ ^[0-9]+$ ]] || [[ -z "$repo_slug" ]]; then + echo "[pulse-wrapper] check_worker_launch: invalid arguments issue='$issue_number' repo='$repo_slug'" >>"$LOGFILE" + return 1 + fi + [[ "$grace_seconds" =~ ^[0-9]+$ ]] || grace_seconds="$PULSE_LAUNCH_GRACE_SECONDS" + if [[ "$grace_seconds" -lt 1 ]]; then + grace_seconds=1 + fi + + local safe_slug + safe_slug=$(echo "$repo_slug" | tr '/:' '--') + local -a log_candidates=( + "/tmp/pulse-${safe_slug}-${issue_number}.log" + "/tmp/pulse-${issue_number}.log" + ) + + local elapsed=0 + local poll_seconds=2 + while [[ "$elapsed" -lt "$grace_seconds" ]]; do + if pgrep -fal "issue-${issue_number}|Issue #${issue_number}:" >/dev/null; then + local candidate + for candidate in "${log_candidates[@]}"; do + if [[ -f "$candidate" ]] && rg -q '^opencode run \[message\.\.\]|^run opencode with a message|^Options:' "$candidate"; then + echo "[pulse-wrapper] Launch validation failed for issue #${issue_number} (${repo_slug}) — CLI usage output detected in ${candidate}" >>"$LOGFILE" + return 1 + fi + done + return 0 + fi + sleep "$poll_seconds" + elapsed=$((elapsed + poll_seconds)) + done + + echo "[pulse-wrapper] Launch validation failed for issue #${issue_number} (${repo_slug}) — no active worker process within ${grace_seconds}s" >>"$LOGFILE" + return 1 +} + +####################################### +# Enforce utilization invariants post-pulse (t1453) +# +# Invariant: keep dispatching pulse cycles until either: +# - active workers >= MAX_WORKERS, OR +# - no runnable work remains in scope +# +# Launch validation integration: +# queued issues with no active worker are treated as launch failures, +# which trigger an immediate backfill cycle. +####################################### +enforce_utilization_invariants() { + local attempts=0 + local max_attempts="$PULSE_BACKFILL_MAX_ATTEMPTS" + + while [[ "$attempts" -lt "$max_attempts" ]]; do + if [[ -f "$STOP_FLAG" ]]; then + echo "[pulse-wrapper] Stop flag present during utilization enforcement — exiting" >>"$LOGFILE" + return 0 + fi + + local max_workers active_workers runnable_count queued_without_worker + max_workers=$(get_max_workers_target) + active_workers=$(count_active_workers) + runnable_count=$(count_runnable_candidates) + queued_without_worker=$(count_queued_without_worker) + + [[ "$max_workers" =~ ^[0-9]+$ ]] || max_workers=1 + [[ "$active_workers" =~ ^[0-9]+$ ]] || active_workers=0 + [[ "$runnable_count" =~ ^[0-9]+$ ]] || runnable_count=0 + [[ "$queued_without_worker" =~ ^[0-9]+$ ]] || queued_without_worker=0 + + if [[ "$active_workers" -ge "$max_workers" && "$queued_without_worker" -eq 0 ]]; then + echo "[pulse-wrapper] Utilization invariant satisfied: active workers ${active_workers}/${max_workers}" >>"$LOGFILE" + return 0 + fi + + if [[ "$runnable_count" -eq 0 && "$queued_without_worker" -eq 0 ]]; then + echo "[pulse-wrapper] Utilization invariant satisfied: no runnable work remains (active ${active_workers}/${max_workers})" >>"$LOGFILE" + return 0 + fi + + attempts=$((attempts + 1)) + echo "[pulse-wrapper] Backfill attempt ${attempts}/${max_attempts}: active=${active_workers}/${max_workers}, runnable=${runnable_count}, queued_without_worker=${queued_without_worker}" >>"$LOGFILE" + + # Refresh prompt state before each backfill cycle so pulse sees latest context. + prefetch_state || true + run_pulse + done + + echo "[pulse-wrapper] Reached backfill attempt cap (${max_attempts}) before utilization invariant converged" >>"$LOGFILE" + return 0 +} + ####################################### # Main # @@ -1719,6 +1929,7 @@ main() { fi run_pulse + enforce_utilization_invariants return 0 }