diff --git a/.agents/scripts/commands/pulse.md b/.agents/scripts/commands/pulse.md index 84a4cb2676..cf968bf48c 100644 --- a/.agents/scripts/commands/pulse.md +++ b/.agents/scripts/commands/pulse.md @@ -40,8 +40,9 @@ This is idempotent — safe to run even when PATH is already correct. All subseq # Max workers (dynamic, from available RAM) MAX_WORKERS=$(cat ~/.aidevops/logs/pulse-max-workers 2>/dev/null || echo 4) -# Count all full-loop workers (issue + PR advancement), excluding supervisor /pulse noise -WORKER_COUNT=$(ps axo command | grep '\.opencode run' | grep '/full-loop' | grep -v '/pulse' | grep -v 'Supervisor Pulse' | grep -v grep | wc -l | tr -d ' ') +# Count all full-loop workers using the same matcher as per-repo caps (unifies global capacity counting) +source ~/.aidevops/agents/scripts/pulse-wrapper.sh +WORKER_COUNT=$(list_active_worker_processes | wc -l | tr -d ' ') AVAILABLE=$((MAX_WORKERS - WORKER_COUNT)) # Priority-class allocations (t1423) — read from pre-fetched state @@ -556,6 +557,24 @@ ISSUE_DISPATCH_BUDGET=$(((AVAILABLE * NEW_ISSUE_DISPATCH_PCT) / 100)) If budget is exhausted, stop opening new issue workers and continue PR advancement work. 1. Skip if a worker is already running for it locally (check `ps` output for the issue number) +1.5. **Apply per-repo worker cap before dispatch:** default `MAX_WORKERS_PER_REPO=5` (override via env var only when you have a clear reason). If the target repo already has `MAX_WORKERS_PER_REPO` active workers, skip dispatch for that repo this cycle and continue with other repos. + +```bash +# Source once per pulse run +source ~/.aidevops/agents/scripts/pulse-wrapper.sh + +MAX_WORKERS_PER_REPO=${MAX_WORKERS_PER_REPO:-5} +ACTIVE_FOR_REPO=$(list_active_worker_processes | awk -v path="" ' + BEGIN { esc=path; gsub(/[][(){}.^$*+?|\\]/, "\\\\&", esc) } + $0 ~ ("--dir[[:space:]]+" esc "([[:space:]]|$)") { count++ } + END { print count + 0 } +') +if [[ "$ACTIVE_FOR_REPO" -ge "$MAX_WORKERS_PER_REPO" ]]; then + echo "Repo at worker cap (${ACTIVE_FOR_REPO}/${MAX_WORKERS_PER_REPO}) — skipping dispatch for this cycle" + continue +fi +``` + 2. Skip if an open PR already exists for it (check PR list) 3. Treat labels as hints, not gates. `status:queued`, `status:in-progress`, and `status:in-review` suggest active work, but verify with evidence (active worker, recent PR updates, recent commits) before skipping. 4. Treat unassigned + non-blocked issues as available by default. `status:available` is optional metadata, not a requirement. diff --git a/.agents/scripts/pulse-wrapper.sh b/.agents/scripts/pulse-wrapper.sh index d61b25849a..bafb9c0d7d 100755 --- a/.agents/scripts/pulse-wrapper.sh +++ b/.agents/scripts/pulse-wrapper.sh @@ -956,9 +956,21 @@ This PR modifies \`.github/workflows/\` files but the GitHub OAuth token used by # # Output: worker summary to stdout (appended to STATE_FILE by caller) ####################################### +list_active_worker_processes() { + ps axo pid,etime,command | awk ' + /\/full-loop/ && + $0 !~ /(^|[[:space:]])\/pulse([[:space:]]|$)/ && + $0 !~ /Supervisor Pulse/ && + $0 ~ /(^|[[:space:]\/])\.?opencode([[:space:]]|$)/ { + print + } + ' + return 0 +} + prefetch_active_workers() { local worker_lines - worker_lines=$(ps axo pid,etime,command | grep '/full-loop' | grep '[.]opencode' || true) + worker_lines=$(list_active_worker_processes || true) echo "" echo "# Active Workers" @@ -1916,19 +1928,7 @@ normalize_active_issue_assignments() { ####################################### count_active_workers() { local count - count=$(ps axo command | awk ' - index($0, ".opencode run") > 0 && - index($0, "/full-loop") > 0 && - !( - $0 ~ /(^|[[:space:]])--role([=[:space:]])pulse([[:space:]]|$)/ && - $0 ~ /(^|[[:space:]])--session-key([=[:space:]])supervisor-pulse([[:space:]]|$)/ - ) { - count++ - } - END { - print count + 0 - } - ') || count=0 + count=$(list_active_worker_processes | wc -l | tr -d ' ') || count=0 echo "$count" return 0 } @@ -1979,8 +1979,13 @@ has_worker_for_repo_issue() { fi local matches - matches=$(ps axo command | awk -v issue="$issue_number" -v path="$repo_path" ' - index($0, path) > 0 && ($0 ~ ("issue-" issue "([^0-9]|$)") || $0 ~ ("Issue #" issue "([^0-9]|$)")) { count++ } + matches=$(list_active_worker_processes | awk -v issue="$issue_number" -v path="$repo_path" ' + BEGIN { + esc = path + gsub(/[][(){}.^$*+?|\\]/, "\\\\&", esc) + } + $0 ~ ("--dir[[:space:]]+" esc "([[:space:]]|$)") && + ($0 ~ ("issue-" issue "([^0-9]|$)") || $0 ~ ("Issue #" issue "([^0-9]|$)")) { count++ } END { print count + 0 } ') || matches=0 [[ "$matches" =~ ^[0-9]+$ ]] || matches=0 diff --git a/.agents/scripts/tests/test-pulse-wrapper-worker-detection.sh b/.agents/scripts/tests/test-pulse-wrapper-worker-detection.sh new file mode 100644 index 0000000000..188ca8437d --- /dev/null +++ b/.agents/scripts/tests/test-pulse-wrapper-worker-detection.sh @@ -0,0 +1,166 @@ +#!/usr/bin/env bash + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" || exit +PULSE_WRAPPER_SCRIPT="${SCRIPT_DIR}/../pulse-wrapper.sh" + +readonly TEST_RED='\033[0;31m' +readonly TEST_GREEN='\033[0;32m' +readonly TEST_RESET='\033[0m' + +TESTS_RUN=0 +TESTS_FAILED=0 + +TEST_ROOT="" +PS_FIXTURE_FILE="" + +print_result() { + local test_name="$1" + local passed="$2" + local message="${3:-}" + TESTS_RUN=$((TESTS_RUN + 1)) + + if [[ "$passed" -eq 0 ]]; then + printf '%bPASS%b %s\n' "$TEST_GREEN" "$TEST_RESET" "$test_name" + return 0 + fi + + printf '%bFAIL%b %s\n' "$TEST_RED" "$TEST_RESET" "$test_name" + if [[ -n "$message" ]]; then + printf ' %s\n' "$message" + fi + TESTS_FAILED=$((TESTS_FAILED + 1)) + return 0 +} + +setup_test_env() { + TEST_ROOT=$(mktemp -d) + PS_FIXTURE_FILE="${TEST_ROOT}/ps-fixture.txt" + export HOME="${TEST_ROOT}/home" + mkdir -p "${HOME}/.aidevops/logs" + + export REPOS_JSON="${TEST_ROOT}/repos.json" + cat >"${REPOS_JSON}" <<'JSON' +{ + "initialized_repos": [ + { + "slug": "marcusquinn/aidevops", + "path": "/tmp/aidevops" + } + ] +} +JSON + + return 0 +} + +teardown_test_env() { + if [[ -n "$TEST_ROOT" && -d "$TEST_ROOT" ]]; then + rm -rf "$TEST_ROOT" + fi + return 0 +} + +set_ps_fixture() { + local content="$1" + printf '%s\n' "$content" >"$PS_FIXTURE_FILE" + return 0 +} + +ps() { + if [[ "${1:-}" == "axo" && "${2:-}" == "pid,etime,command" ]]; then + cat "$PS_FIXTURE_FILE" + return 0 + fi + command ps "$@" + return 0 +} + +test_counts_plain_and_dot_prefixed_opencode_workers() { + # Line 125: supervisor /pulse — excluded by standalone /pulse filter + # Line 126: worker whose session-key contains /pulse-related (not standalone) — must be counted + set_ps_fixture "123 00:10 opencode run --dir /tmp/aidevops --title Issue #4342 \"/full-loop Implement issue #4342\" +124 00:11 /Users/test/.opencode/bin/opencode run --dir /tmp/aidevops --title Issue #4343 \"/full-loop Implement issue #4343\" +125 00:20 opencode run --dir /tmp/aidevops --title Supervisor Pulse \"/pulse\" +126 00:05 opencode run --dir /tmp/aidevops --session-key issue-4344 --title Issue #4344 \"/full-loop Implement issue #4344 -- fix /pulse-related bug\"" + + local count + count=$(count_active_workers) + # Lines 123, 124, 126 are workers; line 125 is the supervisor /pulse (excluded) + if [[ "$count" != "3" ]]; then + print_result "count_active_workers excludes supervisor /pulse but counts worker with /pulse in args" 1 "Expected 3, got ${count}" + return 0 + fi + + print_result "count_active_workers excludes supervisor /pulse but counts worker with /pulse in args" 0 + return 0 +} + +test_repo_issue_detection_uses_filtered_worker_list() { + set_ps_fixture "211 00:31 opencode run --dir /tmp/aidevops --session-key issue-4342 --title Issue #4342: fix \"/full-loop Implement issue #4342\" +212 00:31 opencode run --dir /tmp/other --session-key issue-4342 --title Issue #4342: other \"/full-loop Implement issue #4342\" +213 00:05 opencode run --dir /tmp/aidevops --title Supervisor Pulse \"/pulse\" +214 00:12 opencode run --dir /tmp/aidevops-tools --session-key issue-4342 --title Issue #4342: tools \"/full-loop Implement issue #4342\"" + + if ! has_worker_for_repo_issue "4342" "marcusquinn/aidevops"; then + print_result "has_worker_for_repo_issue matches scoped worker process" 1 "Expected worker match for repo issue" + return 0 + fi + + if has_worker_for_repo_issue "9999" "marcusquinn/aidevops"; then + print_result "has_worker_for_repo_issue rejects unrelated issues" 1 "Expected no worker match for issue 9999" + return 0 + fi + + # Line 214 uses /tmp/aidevops-tools — a prefix of /tmp/aidevops — must NOT match + # Add a second repo entry for aidevops-tools to verify exact path matching + cat >"${REPOS_JSON}" <<'JSON' +{ + "initialized_repos": [ + { + "slug": "marcusquinn/aidevops", + "path": "/tmp/aidevops" + }, + { + "slug": "marcusquinn/aidevops-tools", + "path": "/tmp/aidevops-tools" + } + ] +} +JSON + # Worker 214 is for aidevops-tools, not aidevops — should not count for aidevops + local count_aidevops + count_aidevops=$(list_active_worker_processes | awk -v path="/tmp/aidevops" ' + BEGIN { esc = path; gsub(/[][(){}.^$*+?|\\]/, "\\\\&", esc) } + $0 ~ ("--dir[[:space:]]+" esc "([[:space:]]|$)") { count++ } + END { print count + 0 } + ') + if [[ "$count_aidevops" != "1" ]]; then + print_result "has_worker_for_repo_issue does not match prefix-sibling repo path" 1 "Expected 1 match for /tmp/aidevops, got ${count_aidevops}" + return 0 + fi + + print_result "has_worker_for_repo_issue matches scoped worker process" 0 + print_result "has_worker_for_repo_issue rejects unrelated issues" 0 + print_result "has_worker_for_repo_issue does not match prefix-sibling repo path" 0 + return 0 +} + +main() { + trap teardown_test_env EXIT + setup_test_env + # shellcheck source=/dev/null + source "$PULSE_WRAPPER_SCRIPT" + + test_counts_plain_and_dot_prefixed_opencode_workers + test_repo_issue_detection_uses_filtered_worker_list + + printf '\nRan %s tests, %s failed.\n' "$TESTS_RUN" "$TESTS_FAILED" + if [[ "$TESTS_FAILED" -gt 0 ]]; then + return 1 + fi + return 0 +} + +main "$@"