Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 189 additions & 15 deletions .agents/scripts/pulse-wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@
QUALITY_DEBT_CAP_PCT="${QUALITY_DEBT_CAP_PCT:-$(config_get "orchestration.quality_debt_cap_pct" "30")}" # % cap for quality-debt dispatch share
PULSE_BACKFILL_MAX_ATTEMPTS="${PULSE_BACKFILL_MAX_ATTEMPTS:-3}" # Additional pulse passes when below utilization target (t1453)
PULSE_LAUNCH_GRACE_SECONDS="${PULSE_LAUNCH_GRACE_SECONDS:-20}" # Grace window for worker process to appear after dispatch (t1453)
PRE_RUN_STAGE_TIMEOUT="${PRE_RUN_STAGE_TIMEOUT:-600}" # 10 min cap per pre-run stage (cleanup/prefetch)
PULSE_PREFETCH_PR_LIMIT="${PULSE_PREFETCH_PR_LIMIT:-200}" # Open PR list window per repo for pre-fetched state
PULSE_PREFETCH_ISSUE_LIMIT="${PULSE_PREFETCH_ISSUE_LIMIT:-200}" # Open issue list window per repo for pre-fetched state
PULSE_RUNNABLE_PR_LIMIT="${PULSE_RUNNABLE_PR_LIMIT:-200}" # Open PR sample size for runnable-candidate counting
PULSE_RUNNABLE_ISSUE_LIMIT="${PULSE_RUNNABLE_ISSUE_LIMIT:-200}" # Open issue sample size for runnable-candidate counting
PULSE_QUEUED_SCAN_LIMIT="${PULSE_QUEUED_SCAN_LIMIT:-200}" # Queued-issue scan window per repo

# Process guard limits (t1398)
CHILD_RSS_LIMIT_KB="${CHILD_RSS_LIMIT_KB:-2097152}" # 2 GB default — kill child if RSS exceeds this
Expand All @@ -110,6 +116,12 @@
fi
PULSE_BACKFILL_MAX_ATTEMPTS=$(_validate_int PULSE_BACKFILL_MAX_ATTEMPTS "$PULSE_BACKFILL_MAX_ATTEMPTS" 3 0)
PULSE_LAUNCH_GRACE_SECONDS=$(_validate_int PULSE_LAUNCH_GRACE_SECONDS "$PULSE_LAUNCH_GRACE_SECONDS" 20 5)
PRE_RUN_STAGE_TIMEOUT=$(_validate_int PRE_RUN_STAGE_TIMEOUT "$PRE_RUN_STAGE_TIMEOUT" 600 30)
PULSE_PREFETCH_PR_LIMIT=$(_validate_int PULSE_PREFETCH_PR_LIMIT "$PULSE_PREFETCH_PR_LIMIT" 200 1)
PULSE_PREFETCH_ISSUE_LIMIT=$(_validate_int PULSE_PREFETCH_ISSUE_LIMIT "$PULSE_PREFETCH_ISSUE_LIMIT" 200 1)
PULSE_RUNNABLE_PR_LIMIT=$(_validate_int PULSE_RUNNABLE_PR_LIMIT "$PULSE_RUNNABLE_PR_LIMIT" 200 1)
PULSE_RUNNABLE_ISSUE_LIMIT=$(_validate_int PULSE_RUNNABLE_ISSUE_LIMIT "$PULSE_RUNNABLE_ISSUE_LIMIT" 200 1)
PULSE_QUEUED_SCAN_LIMIT=$(_validate_int PULSE_QUEUED_SCAN_LIMIT "$PULSE_QUEUED_SCAN_LIMIT" 200 1)
CHILD_RSS_LIMIT_KB=$(_validate_int CHILD_RSS_LIMIT_KB "$CHILD_RSS_LIMIT_KB" 2097152 1)
CHILD_RUNTIME_LIMIT=$(_validate_int CHILD_RUNTIME_LIMIT "$CHILD_RUNTIME_LIMIT" 1800 1)
SHELLCHECK_RSS_LIMIT_KB=$(_validate_int SHELLCHECK_RSS_LIMIT_KB "$SHELLCHECK_RSS_LIMIT_KB" 1048576 1)
Expand All @@ -130,6 +142,7 @@
REPOS_JSON="${REPOS_JSON:-${HOME}/.config/aidevops/repos.json}"
STATE_FILE="${HOME}/.aidevops/logs/pulse-state.txt"
QUEUE_METRICS_FILE="${HOME}/.aidevops/logs/pulse-queue-metrics"
SCOPE_FILE="${HOME}/.aidevops/logs/pulse-scope-repos"

if [[ ! -x "$HEADLESS_RUNTIME_HELPER" ]]; then
printf '[pulse-wrapper] ERROR: headless runtime helper is missing or not executable: %s (SCRIPT_DIR=%s)\n' "$HEADLESS_RUNTIME_HELPER" "$SCRIPT_DIR" >&2
Expand Down Expand Up @@ -242,7 +255,7 @@
local pr_json
pr_json=$(gh pr list --repo "$slug" --state open \
--json number,title,reviewDecision,statusCheckRollup,updatedAt,headRefName,createdAt \
--limit 100 2>/dev/null) || pr_json="[]"
--limit "$PULSE_PREFETCH_PR_LIMIT" 2>/dev/null) || pr_json="[]"

local pr_count
pr_count=$(echo "$pr_json" | jq 'length')
Expand Down Expand Up @@ -290,7 +303,7 @@
local issue_json
issue_json=$(gh issue list --repo "$slug" --state open \
--json number,title,labels,updatedAt,assignees \
--limit 50 2>/dev/null) || issue_json="[]"
--limit "$PULSE_PREFETCH_ISSUE_LIMIT" 2>/dev/null) || issue_json="[]"

# Remove issues with supervisor, contributor, persistent, or quality-review labels
local filtered_json
Expand Down Expand Up @@ -358,6 +371,7 @@
local scope_slugs
scope_slugs=$(echo "$repo_entries" | cut -d'|' -f1 | grep . | paste -sd ',' -)
export PULSE_SCOPE_REPOS="$scope_slugs"
echo "$scope_slugs" >"$SCOPE_FILE"
echo "[pulse-wrapper] PULSE_SCOPE_REPOS=${scope_slugs}" >>"$LOGFILE"

local repo_count
Expand Down Expand Up @@ -1253,6 +1267,70 @@
return 0
}

#######################################
# Run a stage with a wall-clock timeout
#
# Arguments:
# $1 - stage name (for logs)
# $2 - timeout seconds
# $3... - command/function to execute
#
# Exit codes:
# 0 - stage completed successfully
# 124 - stage timed out and was killed
# else- stage exited with command exit code
#######################################
run_stage_with_timeout() {
local stage_name="$1"
local timeout_seconds="$2"
shift 2

if [[ -z "$stage_name" ]] || [[ "$#" -lt 1 ]]; then
echo "[pulse-wrapper] run_stage_with_timeout: invalid arguments" >>"$LOGFILE"
return 1
fi
[[ "$timeout_seconds" =~ ^[0-9]+$ ]] || timeout_seconds="$PRE_RUN_STAGE_TIMEOUT"
if [[ "$timeout_seconds" -lt 1 ]]; then
timeout_seconds=1
fi

local stage_start
stage_start=$(date +%s)
echo "[pulse-wrapper] Stage start: ${stage_name} (timeout ${timeout_seconds}s)" >>"$LOGFILE"

"$@" &
local stage_pid=$!

while kill -0 "$stage_pid" 2>/dev/null; do
local now
now=$(date +%s)
local elapsed=$((now - stage_start))
if [[ "$elapsed" -gt "$timeout_seconds" ]]; then
echo "[pulse-wrapper] Stage timeout: ${stage_name} exceeded ${timeout_seconds}s (pid ${stage_pid})" >>"$LOGFILE"
_kill_tree "$stage_pid" || true
sleep 2
if kill -0 "$stage_pid" 2>/dev/null; then
_force_kill_tree "$stage_pid" || true
fi
wait "$stage_pid" 2>/dev/null || true
return 124
fi
sleep 2
done

wait "$stage_pid"
local stage_status=$?
if [[ "$stage_status" -ne 0 ]]; then
echo "[pulse-wrapper] Stage failed: ${stage_name} exited with ${stage_status}" >>"$LOGFILE"
return "$stage_status"
fi

local stage_end
stage_end=$(date +%s)
echo "[pulse-wrapper] Stage complete: ${stage_name} (${stage_status}, $((stage_end - stage_start))s)" >>"$LOGFILE"
return 0
}

#######################################
# Run the pulse — with internal watchdog timeout (t1397, t1398, t1398.3, GH#2958)
#
Expand Down Expand Up @@ -1689,6 +1767,64 @@
return 0
}

#######################################
# Resolve managed repo path from slug
# Arguments:
# $1 - repo slug (owner/repo)
# Returns: path via stdout (empty if not found)
#######################################
get_repo_path_by_slug() {
local repo_slug="$1"
if [[ -z "$repo_slug" ]] || [[ ! -f "$REPOS_JSON" ]]; then
echo ""
return 0
fi

local repo_path
repo_path=$(jq -r --arg slug "$repo_slug" '.initialized_repos[] | select(.slug == $slug) | .path' "$REPOS_JSON" 2>/dev/null | head -n 1)
if [[ "$repo_path" == "null" ]]; then
repo_path=""
fi
echo "$repo_path"
return 0
}

#######################################
# Check if a worker exists for a specific repo+issue pair
# Arguments:
# $1 - issue number
# $2 - repo slug (owner/repo)
# Exit codes:
# 0 - matching worker exists
# 1 - no matching worker
#######################################
has_worker_for_repo_issue() {
local issue_number="$1"
local repo_slug="$2"

if [[ ! "$issue_number" =~ ^[0-9]+$ ]] || [[ -z "$repo_slug" ]]; then
return 1
fi

local repo_path
repo_path=$(get_repo_path_by_slug "$repo_slug")
if [[ -z "$repo_path" ]]; then
return 1
fi

local matches
matches=$(ps axo command | awk -v issue="$issue_number" -v path="$repo_path" '
index($0, path) > 0 && ($0 ~ ("issue-" issue "([^0-9]|$)") || $0 ~ ("Issue #" issue "([^0-9]|$)")) { count++ }
END { print count + 0 }
') || matches=0
[[ "$matches" =~ ^[0-9]+$ ]] || matches=0

if [[ "$matches" -gt 0 ]]; then
return 0
fi
return 1
}

#######################################
# Append adaptive queue-governor guidance to pre-fetched state
#
Expand All @@ -1702,10 +1838,35 @@
fi

local total_prs total_issues ready_prs failing_prs
total_prs=$(awk '/^### Open PRs \([0-9]+\)/ { line=$0; gsub(/[^0-9]/, "", line); sum+=line } END { print sum+0 }' "$STATE_FILE")
total_issues=$(awk '/^### Open Issues \([0-9]+\)/ { line=$0; gsub(/[^0-9]/, "", line); sum+=line } END { print sum+0 }' "$STATE_FILE")
ready_prs=$(rg -c '\[checks: PASS\].*\[review: APPROVED\]' "$STATE_FILE" 2>/dev/null || echo "0")
failing_prs=$(rg -c '\[checks: FAIL\]|\[review: CHANGES_REQUESTED\]' "$STATE_FILE" 2>/dev/null || echo "0")
total_prs=0
total_issues=0
ready_prs=0
failing_prs=0

while IFS='|' read -r slug _path; do
[[ -n "$slug" ]] || continue

local pr_json
pr_json=$(gh pr list --repo "$slug" --state open --json reviewDecision,statusCheckRollup --limit "$PULSE_RUNNABLE_PR_LIMIT" 2>/dev/null) || pr_json="[]"
local repo_pr_total repo_ready repo_failing
repo_pr_total=$(echo "$pr_json" | jq 'length' 2>/dev/null) || repo_pr_total=0
repo_ready=$(echo "$pr_json" | jq '[.[] | select(.reviewDecision == "APPROVED" and ((.statusCheckRollup // []) | length > 0) and ((.statusCheckRollup // []) | all((.conclusion // .state) == "SUCCESS")))] | length' 2>/dev/null) || repo_ready=0
repo_failing=$(echo "$pr_json" | jq '[.[] | select(.reviewDecision == "CHANGES_REQUESTED" or ((.statusCheckRollup // []) | any((.conclusion // .state) == "FAILURE")))] | length' 2>/dev/null) || repo_failing=0

local issue_json repo_issue_total
issue_json=$(gh issue list --repo "$slug" --state open --json number --limit "$PULSE_RUNNABLE_ISSUE_LIMIT" 2>/dev/null) || issue_json="[]"
repo_issue_total=$(echo "$issue_json" | jq 'length' 2>/dev/null) || repo_issue_total=0

Check warning on line 1858 in .agents/scripts/pulse-wrapper.sh

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of using the literal 'length' 5 times.

See more on https://sonarcloud.io/project/issues?id=marcusquinn_aidevops&issues=AZzkV7dEXr6ssy_7F4DA&open=AZzkV7dEXr6ssy_7F4DA&pullRequest=4293

[[ "$repo_pr_total" =~ ^[0-9]+$ ]] || repo_pr_total=0
[[ "$repo_ready" =~ ^[0-9]+$ ]] || repo_ready=0
[[ "$repo_failing" =~ ^[0-9]+$ ]] || repo_failing=0
[[ "$repo_issue_total" =~ ^[0-9]+$ ]] || repo_issue_total=0

total_prs=$((total_prs + repo_pr_total))
total_issues=$((total_issues + repo_issue_total))
ready_prs=$((ready_prs + repo_ready))
failing_prs=$((failing_prs + repo_failing))
done < <(jq -r '.initialized_repos[] | select(.pulse == true and (.local_only // false) == false and .slug != "") | "\(.slug)|\(.path)"' "$REPOS_JSON" 2>/dev/null)

[[ "$total_prs" =~ ^[0-9]+$ ]] || total_prs=0
[[ "$total_issues" =~ ^[0-9]+$ ]] || total_issues=0
Expand Down Expand Up @@ -1824,19 +1985,19 @@
[[ -n "$slug" ]] || continue

local issue_json
issue_json=$(gh issue list --repo "$slug" --state open --json assignees,labels --limit 100 2>/dev/null) || issue_json="[]"
issue_json=$(gh issue list --repo "$slug" --state open --json assignees,labels --limit "$PULSE_RUNNABLE_ISSUE_LIMIT" 2>/dev/null) || issue_json="[]"
local issue_count
issue_count=$(echo "$issue_json" | jq '[.[] | select((.assignees | length) == 0 and (.labels | map(.name) | index("status:blocked") | not))] | length' 2>/dev/null) || issue_count=0
[[ "$issue_count" =~ ^[0-9]+$ ]] || issue_count=0

local pr_json
pr_json=$(gh pr list --repo "$slug" --state open --json reviewDecision,statusCheckRollup --limit 100 2>/dev/null) || pr_json="[]"
pr_json=$(gh pr list --repo "$slug" --state open --json reviewDecision,statusCheckRollup --limit "$PULSE_RUNNABLE_PR_LIMIT" 2>/dev/null) || pr_json="[]"
local pr_count
pr_count=$(echo "$pr_json" | jq '[.[] | select(.reviewDecision == "CHANGES_REQUESTED" or ((.statusCheckRollup // []) | any((.conclusion // .state) == "FAILURE")))] | length' 2>/dev/null) || pr_count=0
[[ "$pr_count" =~ ^[0-9]+$ ]] || pr_count=0

total=$((total + issue_count + pr_count))
done < <(jq -r '.initialized_repos[] | select(.pulse == true and (.local_only // false) == false and .slug != "") | "\(.slug)|\(.path)"' "$repos_json" 2>/dev/null)

Check warning on line 2000 in .agents/scripts/pulse-wrapper.sh

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of using the literal '.initialized_repos[] | select(.pulse == true and (.local_only // false) == false and .slug != \"\") | \"\(.slug)|\(.path)\"' 4 times.

See more on https://sonarcloud.io/project/issues?id=marcusquinn_aidevops&issues=AZzkV7dEXr6ssy_7F4DB&open=AZzkV7dEXr6ssy_7F4DB&pullRequest=4293

echo "$total"
return 0
Expand All @@ -1859,14 +2020,14 @@
while IFS= read -r slug; do
[[ -n "$slug" ]] || continue
local queued_numbers
queued_numbers=$(gh issue list --repo "$slug" --state open --label "status:queued" --json number --jq '.[].number' --limit 100 2>/dev/null) || queued_numbers=""
queued_numbers=$(gh issue list --repo "$slug" --state open --label "status:queued" --json number --jq '.[].number' --limit "$PULSE_QUEUED_SCAN_LIMIT" 2>/dev/null) || queued_numbers=""
if [[ -z "$queued_numbers" ]]; then
continue
fi

while IFS= read -r issue_num; do
[[ "$issue_num" =~ ^[0-9]+$ ]] || continue
if ! pgrep -fal "issue-${issue_num}|Issue #${issue_num}:" >/dev/null; then
if ! has_worker_for_repo_issue "$issue_num" "$slug"; then
total=$((total + 1))
fi
done <<<"$queued_numbers"
Expand Down Expand Up @@ -1912,7 +2073,7 @@
local elapsed=0
local poll_seconds=2
while [[ "$elapsed" -lt "$grace_seconds" ]]; do
if pgrep -fal "issue-${issue_number}|Issue #${issue_number}:" >/dev/null; then
if has_worker_for_repo_issue "$issue_number" "$repo_slug"; then
local candidate
for candidate in "${log_candidates[@]}"; do
if [[ -f "$candidate" ]] && rg -q '^opencode run \[message\.\.\]|^run opencode with a message|^Options:' "$candidate"; then
Expand Down Expand Up @@ -2013,9 +2174,9 @@
# run_pulse() overwrites with the opencode PID for watchdog tracking.
echo "$$" >"$PIDFILE"

cleanup_orphans
cleanup_worktrees
cleanup_stashes
run_stage_with_timeout "cleanup_orphans" "$PRE_RUN_STAGE_TIMEOUT" cleanup_orphans || true
run_stage_with_timeout "cleanup_worktrees" "$PRE_RUN_STAGE_TIMEOUT" cleanup_worktrees || true
run_stage_with_timeout "cleanup_stashes" "$PRE_RUN_STAGE_TIMEOUT" cleanup_stashes || true
calculate_max_workers
calculate_priority_allocations
check_session_count >/dev/null
Expand All @@ -2025,7 +2186,20 @@
# comment bodies. Output appended to STATE_FILE for the pulse agent.
prefetch_contribution_watch

prefetch_state
if ! run_stage_with_timeout "prefetch_state" "$PRE_RUN_STAGE_TIMEOUT" prefetch_state; then
echo "[pulse-wrapper] prefetch_state did not complete successfully — aborting this cycle to avoid stale dispatch decisions" >>"$LOGFILE"
rm -f "$PIDFILE"
return 0
fi

if [[ -f "$SCOPE_FILE" ]]; then
local persisted_scope
persisted_scope=$(cat "$SCOPE_FILE" 2>/dev/null || echo "")
if [[ -n "$persisted_scope" ]]; then
export PULSE_SCOPE_REPOS="$persisted_scope"
echo "[pulse-wrapper] Restored PULSE_SCOPE_REPOS from ${SCOPE_FILE}" >>"$LOGFILE"
fi
fi

# Re-check stop flag immediately before run_pulse() — a stop may have
# been issued during the prefetch/cleanup phase above (t2943)
Expand Down
Loading