Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/core/security/securityCheck.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import pc from 'picocolors';
import { logger } from '../../shared/logger.js';
import { initTaskRunner } from '../../shared/processConcurrency.js';
import {
getProcessConcurrency as defaultGetProcessConcurrency,
initTaskRunner,
} from '../../shared/processConcurrency.js';
import type { RepomixProgressCallback } from '../../shared/types.js';
import type { RawFile } from '../file/fileTypes.js';
import type { GitDiffResult } from '../git/gitDiffHandle.js';
Expand Down Expand Up @@ -29,6 +32,7 @@ export const runSecurityCheck = async (
gitLogResult?: GitLogResult,
deps = {
initTaskRunner,
getProcessConcurrency: defaultGetProcessConcurrency,
},
): Promise<SuspiciousFileResult[]> => {
const gitDiffItems: SecurityCheckItem[] = [];
Expand Down Expand Up @@ -74,14 +78,21 @@ export const runSecurityCheck = async (
const allItems = [...fileItems, ...gitDiffItems, ...gitLogItems];
const totalItems = allItems.length;

// NOTE: numOfTasks uses totalItems (not batches.length) intentionally.
// getWorkerThreadCount uses Math.ceil(numOfTasks / TASKS_PER_THREAD) to size the pool,
// where TASKS_PER_THREAD=100 is calibrated for fine-grained tasks.
// Passing batches.length (e.g. 2) would yield maxThreads=1, forcing sequential execution.
if (totalItems === 0) {
return [];
}

// Cap security workers at 2 to reduce contention with the metrics worker pool that
// runs concurrently. The security check uses coarse-grained batches (BATCH_SIZE=50),
// so 2 workers provide sufficient parallelism even for large repos (1000 files = 20 batches).
const maxSecurityWorkers = Math.min(2, deps.getProcessConcurrency());

// numOfTasks uses totalItems (not batches.length) to avoid under-sizing the pool.
const taskRunner = deps.initTaskRunner<SecurityCheckTask, (SuspiciousFileResult | null)[]>({
numOfTasks: totalItems,
workerType: 'securityCheck',
runtime: 'worker_threads',
maxWorkerThreads: maxSecurityWorkers,
});

// Split items into batches to reduce IPC round-trips
Expand Down
16 changes: 12 additions & 4 deletions src/shared/processConcurrency.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface WorkerOptions {
numOfTasks: number;
workerType: WorkerType;
runtime: WorkerRuntime;
maxWorkerThreads?: number;
}

/**
Expand Down Expand Up @@ -45,13 +46,20 @@ export const getProcessConcurrency = (): number => {
return typeof os.availableParallelism === 'function' ? os.availableParallelism() : os.cpus().length;
};

export const getWorkerThreadCount = (numOfTasks: number): { minThreads: number; maxThreads: number } => {
export const getWorkerThreadCount = (
numOfTasks: number,
maxWorkerThreads?: number,
): { minThreads: number; maxThreads: number } => {
const processConcurrency = getProcessConcurrency();

const minThreads = 1;

// Apply optional cap to limit thread count (e.g., to reduce contention with other concurrent pools)
const effectiveConcurrency =
maxWorkerThreads != null ? Math.min(processConcurrency, maxWorkerThreads) : processConcurrency;

// Limit max threads based on number of tasks
const maxThreads = Math.max(minThreads, Math.min(processConcurrency, Math.ceil(numOfTasks / TASKS_PER_THREAD)));
const maxThreads = Math.max(minThreads, Math.min(effectiveConcurrency, Math.ceil(numOfTasks / TASKS_PER_THREAD)));

return {
minThreads,
Expand All @@ -60,8 +68,8 @@ export const getWorkerThreadCount = (numOfTasks: number): { minThreads: number;
};

export const createWorkerPool = (options: WorkerOptions): Tinypool => {
const { numOfTasks, workerType, runtime = 'child_process' } = options;
const { minThreads, maxThreads } = getWorkerThreadCount(numOfTasks);
const { numOfTasks, workerType, runtime = 'child_process', maxWorkerThreads } = options;
const { minThreads, maxThreads } = getWorkerThreadCount(numOfTasks, maxWorkerThreads);

// Get worker path - uses unified worker in bundled env, individual files otherwise
const workerPath = getWorkerPath(workerType);
Expand Down
14 changes: 14 additions & 0 deletions tests/core/security/securityCheck.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { WorkerOptions } from '../../../src/shared/processConcurrency.js';

vi.mock('../../../src/shared/logger');
vi.mock('../../../src/shared/processConcurrency', () => ({
getProcessConcurrency: vi.fn(() => 4),
initWorker: vi.fn(() => ({
run: vi.fn().mockImplementation(async (task: SecurityCheckTask) => {
return await securityCheckWorker(task);
Expand Down Expand Up @@ -39,6 +40,8 @@ const mockFiles: RawFile[] = [
},
];

const mockGetProcessConcurrency = () => 4;

const mockInitTaskRunner = <T, R>(_options: WorkerOptions) => {
return {
run: async (task: T) => {
Expand All @@ -54,6 +57,7 @@ describe('runSecurityCheck', () => {
it('should identify files with security issues', async () => {
const result = await runSecurityCheck(mockFiles, () => {}, undefined, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

expect(result).toHaveLength(1);
Expand All @@ -66,6 +70,7 @@ describe('runSecurityCheck', () => {

await runSecurityCheck(mockFiles, progressCallback, undefined, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

// With 2 files and batch size 50, all files are in a single batch
Expand All @@ -91,6 +96,7 @@ describe('runSecurityCheck', () => {
await expect(
runSecurityCheck(mockFiles, () => {}, undefined, undefined, {
initTaskRunner: mockErrorTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
}),
).rejects.toThrow('Worker error');

Expand All @@ -100,6 +106,7 @@ describe('runSecurityCheck', () => {
it('should handle empty file list', async () => {
const result = await runSecurityCheck([], () => {}, undefined, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

expect(result).toEqual([]);
Expand All @@ -108,6 +115,7 @@ describe('runSecurityCheck', () => {
it('should log performance metrics in trace mode', async () => {
await runSecurityCheck(mockFiles, () => {}, undefined, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

expect(logger.trace).toHaveBeenCalledWith(expect.stringContaining('Starting security check for'));
Expand All @@ -119,6 +127,7 @@ describe('runSecurityCheck', () => {

await runSecurityCheck(mockFiles, () => {}, undefined, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

const endTime = Date.now();
Expand All @@ -133,6 +142,7 @@ describe('runSecurityCheck', () => {

await runSecurityCheck(mockFiles, () => {}, undefined, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

expect(mockFiles).toEqual(originalFiles);
Expand All @@ -159,6 +169,7 @@ describe('runSecurityCheck', () => {
const progressCallback = vi.fn();
const result = await runSecurityCheck(mockFiles, progressCallback, gitDiffResult, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

// With batch size 50 and 4 items (2 files + 2 git diffs), all in a single batch
Expand All @@ -177,6 +188,7 @@ describe('runSecurityCheck', () => {
const progressCallback = vi.fn();
await runSecurityCheck(mockFiles, progressCallback, gitDiffResult, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

// With batch size 50 and 3 items (2 files + 1 git diff), all in a single batch
Expand All @@ -192,6 +204,7 @@ describe('runSecurityCheck', () => {
const progressCallback = vi.fn();
await runSecurityCheck(mockFiles, progressCallback, gitDiffResult, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

// With batch size 50 and 3 items (2 files + 1 git diff), all in a single batch
Expand All @@ -207,6 +220,7 @@ describe('runSecurityCheck', () => {
const progressCallback = vi.fn();
await runSecurityCheck(mockFiles, progressCallback, gitDiffResult, undefined, {
initTaskRunner: mockInitTaskRunner,
getProcessConcurrency: mockGetProcessConcurrency,
});

// Should process only 2 files, no git diff content because both are empty strings (falsy)
Expand Down
20 changes: 20 additions & 0 deletions tests/shared/processConcurrency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,26 @@ describe('processConcurrency', () => {
expect(minThreads).toBe(1);
expect(maxThreads).toBe(1);
});

it('should cap max threads when maxWorkerThreads is provided', () => {
// CPU has 8 cores, 1000 tasks would normally give 8 threads
const { maxThreads } = getWorkerThreadCount(1000, 3);

expect(maxThreads).toBe(3);
});

it('should not exceed task-based limit even with higher maxWorkerThreads', () => {
// 200 tasks → ceil(200/100) = 2 threads, maxWorkerThreads=6 should not increase it
const { maxThreads } = getWorkerThreadCount(200, 6);

expect(maxThreads).toBe(2);
});

it('should ignore maxWorkerThreads when undefined', () => {
const { maxThreads } = getWorkerThreadCount(1000, undefined);

expect(maxThreads).toBe(8);
});
});

describe('initWorker', () => {
Expand Down
Loading