From 2a879425a0708c14127fcf38627613a8ad2460ef Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Mon, 6 Apr 2026 00:48:48 +0900 Subject: [PATCH 1/5] perf(core): Reduce worker thread contention for faster pipeline execution Add maxWorkerThreads option to WorkerOptions for explicit thread count capping, then use it to reduce CPU contention when metrics and security worker pools run concurrently during the pipeline overlap phase. - Metrics pool: capped at (processConcurrency - 1) - Security pool: capped at floor(processConcurrency / 2) On a 4-core machine this reduces concurrent threads from 8 (4+4) to 5 (3+2), avoiding context-switching overhead during gpt-tokenizer warmup. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/core/metrics/calculateMetrics.ts | 14 ++++++++++++-- src/core/security/securityCheck.ts | 12 +++++++----- src/shared/processConcurrency.ts | 16 ++++++++++++---- tests/core/security/securityCheck.test.ts | 1 + tests/shared/processConcurrency.test.ts | 20 ++++++++++++++++++++ 5 files changed, 52 insertions(+), 11 deletions(-) diff --git a/src/core/metrics/calculateMetrics.ts b/src/core/metrics/calculateMetrics.ts index d20dea52c..f42873448 100644 --- a/src/core/metrics/calculateMetrics.ts +++ b/src/core/metrics/calculateMetrics.ts @@ -1,5 +1,10 @@ import type { RepomixConfigMerged } from '../../config/configSchema.js'; -import { getWorkerThreadCount, initTaskRunner, type TaskRunner } from '../../shared/processConcurrency.js'; +import { + getProcessConcurrency, + getWorkerThreadCount, + initTaskRunner, + type TaskRunner, +} from '../../shared/processConcurrency.js'; import type { RepomixProgressCallback } from '../../shared/types.js'; import type { ProcessedFile } from '../file/fileTypes.js'; import type { GitDiffResult } from '../git/gitDiffHandle.js'; @@ -34,13 +39,18 @@ export interface MetricsTaskRunnerWithWarmup { * output generation). */ export const createMetricsTaskRunner = (numOfTasks: number, encoding: TokenEncoding): MetricsTaskRunnerWithWarmup => { + // Cap metrics workers at (processConcurrency - 1) to leave CPU headroom for the + // security worker pool that runs concurrently during the pipeline overlap phase. + const maxMetricsWorkers = Math.max(1, getProcessConcurrency() - 1); + const taskRunner = initTaskRunner({ numOfTasks, workerType: 'calculateMetrics', runtime: 'worker_threads', + maxWorkerThreads: maxMetricsWorkers, }); - const { maxThreads } = getWorkerThreadCount(numOfTasks); + const { maxThreads } = getWorkerThreadCount(numOfTasks, maxMetricsWorkers); const warmupPromise = Promise.all( Array.from({ length: maxThreads }, () => taskRunner.run({ content: '', encoding }).catch(() => 0)), ); diff --git a/src/core/security/securityCheck.ts b/src/core/security/securityCheck.ts index f1736a580..bb6d23319 100644 --- a/src/core/security/securityCheck.ts +++ b/src/core/security/securityCheck.ts @@ -1,6 +1,6 @@ import pc from 'picocolors'; import { logger } from '../../shared/logger.js'; -import { initTaskRunner } from '../../shared/processConcurrency.js'; +import { getProcessConcurrency, initTaskRunner } from '../../shared/processConcurrency.js'; import type { RepomixProgressCallback } from '../../shared/types.js'; import type { RawFile } from '../file/fileTypes.js'; import type { GitDiffResult } from '../git/gitDiffHandle.js'; @@ -74,14 +74,16 @@ export const runSecurityCheck = async ( const allItems = [...fileItems, ...gitDiffItems, ...gitLogItems]; const totalItems = allItems.length; - // NOTE: numOfTasks uses totalItems (not batches.length) intentionally. - // getWorkerThreadCount uses Math.ceil(numOfTasks / TASKS_PER_THREAD) to size the pool, - // where TASKS_PER_THREAD=100 is calibrated for fine-grained tasks. - // Passing batches.length (e.g. 2) would yield maxThreads=1, forcing sequential execution. + // Cap security workers at half the available CPU cores to reduce contention with the + // metrics worker pool that runs concurrently. The security check uses coarse-grained + // batches (BATCH_SIZE=50), so fewer workers still provide sufficient parallelism. + const maxSecurityWorkers = Math.max(1, Math.floor(getProcessConcurrency() / 2)); + const taskRunner = deps.initTaskRunner({ numOfTasks: totalItems, workerType: 'securityCheck', runtime: 'worker_threads', + maxWorkerThreads: maxSecurityWorkers, }); // Split items into batches to reduce IPC round-trips diff --git a/src/shared/processConcurrency.ts b/src/shared/processConcurrency.ts index 62756106e..2c3581bf7 100644 --- a/src/shared/processConcurrency.ts +++ b/src/shared/processConcurrency.ts @@ -12,6 +12,7 @@ export interface WorkerOptions { numOfTasks: number; workerType: WorkerType; runtime: WorkerRuntime; + maxWorkerThreads?: number; } /** @@ -45,13 +46,20 @@ export const getProcessConcurrency = (): number => { return typeof os.availableParallelism === 'function' ? os.availableParallelism() : os.cpus().length; }; -export const getWorkerThreadCount = (numOfTasks: number): { minThreads: number; maxThreads: number } => { +export const getWorkerThreadCount = ( + numOfTasks: number, + maxWorkerThreads?: number, +): { minThreads: number; maxThreads: number } => { const processConcurrency = getProcessConcurrency(); const minThreads = 1; + // Apply optional cap to limit thread count (e.g., to reduce contention with other concurrent pools) + const effectiveConcurrency = + maxWorkerThreads != null ? Math.min(processConcurrency, maxWorkerThreads) : processConcurrency; + // Limit max threads based on number of tasks - const maxThreads = Math.max(minThreads, Math.min(processConcurrency, Math.ceil(numOfTasks / TASKS_PER_THREAD))); + const maxThreads = Math.max(minThreads, Math.min(effectiveConcurrency, Math.ceil(numOfTasks / TASKS_PER_THREAD))); return { minThreads, @@ -60,8 +68,8 @@ export const getWorkerThreadCount = (numOfTasks: number): { minThreads: number; }; export const createWorkerPool = (options: WorkerOptions): Tinypool => { - const { numOfTasks, workerType, runtime = 'child_process' } = options; - const { minThreads, maxThreads } = getWorkerThreadCount(numOfTasks); + const { numOfTasks, workerType, runtime = 'child_process', maxWorkerThreads } = options; + const { minThreads, maxThreads } = getWorkerThreadCount(numOfTasks, maxWorkerThreads); // Get worker path - uses unified worker in bundled env, individual files otherwise const workerPath = getWorkerPath(workerType); diff --git a/tests/core/security/securityCheck.test.ts b/tests/core/security/securityCheck.test.ts index 22d110616..5805c8d88 100644 --- a/tests/core/security/securityCheck.test.ts +++ b/tests/core/security/securityCheck.test.ts @@ -12,6 +12,7 @@ import type { WorkerOptions } from '../../../src/shared/processConcurrency.js'; vi.mock('../../../src/shared/logger'); vi.mock('../../../src/shared/processConcurrency', () => ({ + getProcessConcurrency: vi.fn(() => 4), initWorker: vi.fn(() => ({ run: vi.fn().mockImplementation(async (task: SecurityCheckTask) => { return await securityCheckWorker(task); diff --git a/tests/shared/processConcurrency.test.ts b/tests/shared/processConcurrency.test.ts index 289406ff6..c2b053c3a 100644 --- a/tests/shared/processConcurrency.test.ts +++ b/tests/shared/processConcurrency.test.ts @@ -77,6 +77,26 @@ describe('processConcurrency', () => { expect(minThreads).toBe(1); expect(maxThreads).toBe(1); }); + + it('should cap max threads when maxWorkerThreads is provided', () => { + // CPU has 8 cores, 1000 tasks would normally give 8 threads + const { maxThreads } = getWorkerThreadCount(1000, 3); + + expect(maxThreads).toBe(3); + }); + + it('should not exceed task-based limit even with higher maxWorkerThreads', () => { + // 200 tasks → ceil(200/100) = 2 threads, maxWorkerThreads=6 should not increase it + const { maxThreads } = getWorkerThreadCount(200, 6); + + expect(maxThreads).toBe(2); + }); + + it('should ignore maxWorkerThreads when undefined', () => { + const { maxThreads } = getWorkerThreadCount(1000, undefined); + + expect(maxThreads).toBe(8); + }); }); describe('initWorker', () => { From aff7aa918d3675b19440857f0ab7d05ad65e983f Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Mon, 6 Apr 2026 01:09:28 +0900 Subject: [PATCH 2/5] refactor(core): Cap security workers at 2 instead of floor(cores/2) Security check uses coarse-grained batches (50 files/batch), so 2 workers provide sufficient parallelism even for large repos. Fixing the cap at 2 avoids over-allocation on high-core machines (e.g., 8 threads on 16 cores) while keeping the formula simple and predictable. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/core/security/securityCheck.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/security/securityCheck.ts b/src/core/security/securityCheck.ts index bb6d23319..f747bdc98 100644 --- a/src/core/security/securityCheck.ts +++ b/src/core/security/securityCheck.ts @@ -74,10 +74,10 @@ export const runSecurityCheck = async ( const allItems = [...fileItems, ...gitDiffItems, ...gitLogItems]; const totalItems = allItems.length; - // Cap security workers at half the available CPU cores to reduce contention with the - // metrics worker pool that runs concurrently. The security check uses coarse-grained - // batches (BATCH_SIZE=50), so fewer workers still provide sufficient parallelism. - const maxSecurityWorkers = Math.max(1, Math.floor(getProcessConcurrency() / 2)); + // Cap security workers at 2 to reduce contention with the metrics worker pool that + // runs concurrently. The security check uses coarse-grained batches (BATCH_SIZE=50), + // so 2 workers provide sufficient parallelism even for large repos (1000 files = 20 batches). + const maxSecurityWorkers = Math.min(2, getProcessConcurrency()); const taskRunner = deps.initTaskRunner({ numOfTasks: totalItems, From 46a0a7b72cd0eab63d437d796db09422b7ab4dc7 Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Mon, 6 Apr 2026 01:16:20 +0900 Subject: [PATCH 3/5] perf(core): Remove metrics worker cap to avoid regression on low-core machines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI benchmarks showed macOS (3 vCPU) regressed by ~6-7% with the processConcurrency-1 cap, while Ubuntu/Windows (4 vCPU) improved. On 3 cores, capping metrics from 3→2 workers is a 33% parallelism reduction that outweighs contention savings. The main improvement comes from capping security at 2 workers, which is sufficient since the security pool uses coarse batches (50 files) and finishes quickly. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/core/metrics/calculateMetrics.ts | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/core/metrics/calculateMetrics.ts b/src/core/metrics/calculateMetrics.ts index f42873448..d20dea52c 100644 --- a/src/core/metrics/calculateMetrics.ts +++ b/src/core/metrics/calculateMetrics.ts @@ -1,10 +1,5 @@ import type { RepomixConfigMerged } from '../../config/configSchema.js'; -import { - getProcessConcurrency, - getWorkerThreadCount, - initTaskRunner, - type TaskRunner, -} from '../../shared/processConcurrency.js'; +import { getWorkerThreadCount, initTaskRunner, type TaskRunner } from '../../shared/processConcurrency.js'; import type { RepomixProgressCallback } from '../../shared/types.js'; import type { ProcessedFile } from '../file/fileTypes.js'; import type { GitDiffResult } from '../git/gitDiffHandle.js'; @@ -39,18 +34,13 @@ export interface MetricsTaskRunnerWithWarmup { * output generation). */ export const createMetricsTaskRunner = (numOfTasks: number, encoding: TokenEncoding): MetricsTaskRunnerWithWarmup => { - // Cap metrics workers at (processConcurrency - 1) to leave CPU headroom for the - // security worker pool that runs concurrently during the pipeline overlap phase. - const maxMetricsWorkers = Math.max(1, getProcessConcurrency() - 1); - const taskRunner = initTaskRunner({ numOfTasks, workerType: 'calculateMetrics', runtime: 'worker_threads', - maxWorkerThreads: maxMetricsWorkers, }); - const { maxThreads } = getWorkerThreadCount(numOfTasks, maxMetricsWorkers); + const { maxThreads } = getWorkerThreadCount(numOfTasks); const warmupPromise = Promise.all( Array.from({ length: maxThreads }, () => taskRunner.run({ content: '', encoding }).catch(() => 0)), ); From f49a5d06a0d4c91513a865e36b55891ebd181c51 Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Mon, 6 Apr 2026 01:26:48 +0900 Subject: [PATCH 4/5] fix(core): Add early return for empty input and restore numOfTasks comment Address PR review feedback: - Add early return when totalItems === 0 to skip worker pool init - Restore comment explaining why numOfTasks uses totalItems not batches.length Co-Authored-By: Claude Opus 4.6 (1M context) --- src/core/security/securityCheck.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/core/security/securityCheck.ts b/src/core/security/securityCheck.ts index f747bdc98..a244b18d7 100644 --- a/src/core/security/securityCheck.ts +++ b/src/core/security/securityCheck.ts @@ -74,11 +74,16 @@ export const runSecurityCheck = async ( const allItems = [...fileItems, ...gitDiffItems, ...gitLogItems]; const totalItems = allItems.length; + if (totalItems === 0) { + return []; + } + // Cap security workers at 2 to reduce contention with the metrics worker pool that // runs concurrently. The security check uses coarse-grained batches (BATCH_SIZE=50), // so 2 workers provide sufficient parallelism even for large repos (1000 files = 20 batches). const maxSecurityWorkers = Math.min(2, getProcessConcurrency()); + // numOfTasks uses totalItems (not batches.length) to avoid under-sizing the pool. const taskRunner = deps.initTaskRunner({ numOfTasks: totalItems, workerType: 'securityCheck', From 7de692329a246c9a67127b3f88d40f95cf0a336f Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Mon, 6 Apr 2026 01:29:35 +0900 Subject: [PATCH 5/5] refactor(core): Inject getProcessConcurrency via deps in securityCheck Move getProcessConcurrency from a direct module import to the deps parameter for consistency with initTaskRunner. This makes it easier to test with different concurrency values without module-level mocking. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/core/security/securityCheck.ts | 8 ++++++-- tests/core/security/securityCheck.test.ts | 13 +++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/core/security/securityCheck.ts b/src/core/security/securityCheck.ts index a244b18d7..30e5b386e 100644 --- a/src/core/security/securityCheck.ts +++ b/src/core/security/securityCheck.ts @@ -1,6 +1,9 @@ import pc from 'picocolors'; import { logger } from '../../shared/logger.js'; -import { getProcessConcurrency, initTaskRunner } from '../../shared/processConcurrency.js'; +import { + getProcessConcurrency as defaultGetProcessConcurrency, + initTaskRunner, +} from '../../shared/processConcurrency.js'; import type { RepomixProgressCallback } from '../../shared/types.js'; import type { RawFile } from '../file/fileTypes.js'; import type { GitDiffResult } from '../git/gitDiffHandle.js'; @@ -29,6 +32,7 @@ export const runSecurityCheck = async ( gitLogResult?: GitLogResult, deps = { initTaskRunner, + getProcessConcurrency: defaultGetProcessConcurrency, }, ): Promise => { const gitDiffItems: SecurityCheckItem[] = []; @@ -81,7 +85,7 @@ export const runSecurityCheck = async ( // Cap security workers at 2 to reduce contention with the metrics worker pool that // runs concurrently. The security check uses coarse-grained batches (BATCH_SIZE=50), // so 2 workers provide sufficient parallelism even for large repos (1000 files = 20 batches). - const maxSecurityWorkers = Math.min(2, getProcessConcurrency()); + const maxSecurityWorkers = Math.min(2, deps.getProcessConcurrency()); // numOfTasks uses totalItems (not batches.length) to avoid under-sizing the pool. const taskRunner = deps.initTaskRunner({ diff --git a/tests/core/security/securityCheck.test.ts b/tests/core/security/securityCheck.test.ts index 5805c8d88..4ee03091a 100644 --- a/tests/core/security/securityCheck.test.ts +++ b/tests/core/security/securityCheck.test.ts @@ -40,6 +40,8 @@ const mockFiles: RawFile[] = [ }, ]; +const mockGetProcessConcurrency = () => 4; + const mockInitTaskRunner = (_options: WorkerOptions) => { return { run: async (task: T) => { @@ -55,6 +57,7 @@ describe('runSecurityCheck', () => { it('should identify files with security issues', async () => { const result = await runSecurityCheck(mockFiles, () => {}, undefined, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); expect(result).toHaveLength(1); @@ -67,6 +70,7 @@ describe('runSecurityCheck', () => { await runSecurityCheck(mockFiles, progressCallback, undefined, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); // With 2 files and batch size 50, all files are in a single batch @@ -92,6 +96,7 @@ describe('runSecurityCheck', () => { await expect( runSecurityCheck(mockFiles, () => {}, undefined, undefined, { initTaskRunner: mockErrorTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }), ).rejects.toThrow('Worker error'); @@ -101,6 +106,7 @@ describe('runSecurityCheck', () => { it('should handle empty file list', async () => { const result = await runSecurityCheck([], () => {}, undefined, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); expect(result).toEqual([]); @@ -109,6 +115,7 @@ describe('runSecurityCheck', () => { it('should log performance metrics in trace mode', async () => { await runSecurityCheck(mockFiles, () => {}, undefined, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); expect(logger.trace).toHaveBeenCalledWith(expect.stringContaining('Starting security check for')); @@ -120,6 +127,7 @@ describe('runSecurityCheck', () => { await runSecurityCheck(mockFiles, () => {}, undefined, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); const endTime = Date.now(); @@ -134,6 +142,7 @@ describe('runSecurityCheck', () => { await runSecurityCheck(mockFiles, () => {}, undefined, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); expect(mockFiles).toEqual(originalFiles); @@ -160,6 +169,7 @@ describe('runSecurityCheck', () => { const progressCallback = vi.fn(); const result = await runSecurityCheck(mockFiles, progressCallback, gitDiffResult, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); // With batch size 50 and 4 items (2 files + 2 git diffs), all in a single batch @@ -178,6 +188,7 @@ describe('runSecurityCheck', () => { const progressCallback = vi.fn(); await runSecurityCheck(mockFiles, progressCallback, gitDiffResult, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); // With batch size 50 and 3 items (2 files + 1 git diff), all in a single batch @@ -193,6 +204,7 @@ describe('runSecurityCheck', () => { const progressCallback = vi.fn(); await runSecurityCheck(mockFiles, progressCallback, gitDiffResult, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); // With batch size 50 and 3 items (2 files + 1 git diff), all in a single batch @@ -208,6 +220,7 @@ describe('runSecurityCheck', () => { const progressCallback = vi.fn(); await runSecurityCheck(mockFiles, progressCallback, gitDiffResult, undefined, { initTaskRunner: mockInitTaskRunner, + getProcessConcurrency: mockGetProcessConcurrency, }); // Should process only 2 files, no git diff content because both are empty strings (falsy)