From 8f07b63a616c558fdbd6cf7f7a991ee1c5bccb67 Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Thu, 28 Aug 2025 19:48:35 +0900 Subject: [PATCH 1/5] feat(core): Add runtime selection support for worker pools Add WorkerRuntime type and configurable runtime parameter to createWorkerPool and initTaskRunner functions. This allows choosing between 'worker_threads' and 'child_process' runtimes based on performance requirements. - Add WorkerRuntime type definition for type safety - Add optional runtime parameter to createWorkerPool with child_process default - Add optional runtime parameter to initTaskRunner with child_process default - Configure fileCollectWorker to use worker_threads for better performance - Update all test files to use WorkerRuntime type - Add comprehensive tests for runtime parameter functionality - Maintain backward compatibility with existing code The fileCollectWorker now benefits from worker_threads faster startup and shared memory, while other workers continue using child_process for stability. --- benchmarks/memory/src/memory-test.ts | 2 +- src/core/file/fileCollect.ts | 1 + src/shared/processConcurrency.ts | 23 +++++++++---- tests/core/file/fileCollect.test.ts | 34 ++++++++++++++++++- tests/core/file/fileProcess.test.ts | 3 +- .../metrics/calculateOutputMetrics.test.ts | 11 +++--- .../calculateSelectiveFileMetrics.test.ts | 3 +- tests/core/security/securityCheck.test.ts | 5 +-- tests/integration-tests/packager.test.ts | 3 +- tests/shared/processConcurrency.test.ts | 30 ++++++++++++++++ 10 files changed, 96 insertions(+), 19 deletions(-) diff --git a/benchmarks/memory/src/memory-test.ts b/benchmarks/memory/src/memory-test.ts index e31511a52..fbb6a5c6c 100644 --- a/benchmarks/memory/src/memory-test.ts +++ b/benchmarks/memory/src/memory-test.ts @@ -26,7 +26,7 @@ const flags = { // Extract numeric arguments const numericArgs = args.filter((arg) => !arg.startsWith('-') && !Number.isNaN(Number(arg))); -const iterations = Number(numericArgs[0]) || (flags.full ? 200 : 50); +const iterations = Number(numericArgs[0]) || (flags.full ? 200 : 100); const delay = Number(numericArgs[1]) || (flags.full ? 100 : 50); // Configuration diff --git a/src/core/file/fileCollect.ts b/src/core/file/fileCollect.ts index 6cd284f52..bda4cb2cc 100644 --- a/src/core/file/fileCollect.ts +++ b/src/core/file/fileCollect.ts @@ -26,6 +26,7 @@ export const collectFiles = async ( const taskRunner = deps.initTaskRunner( filePaths.length, new URL('./workers/fileCollectWorker.js', import.meta.url).href, + 'worker_threads', ); const tasks = filePaths.map( (filePath) => diff --git a/src/shared/processConcurrency.ts b/src/shared/processConcurrency.ts index be6dc6ad0..c4884fee7 100644 --- a/src/shared/processConcurrency.ts +++ b/src/shared/processConcurrency.ts @@ -1,7 +1,9 @@ import os from 'node:os'; -import { Tinypool } from 'tinypool'; +import { Options, Tinypool } from 'tinypool'; import { logger } from './logger.js'; +export type WorkerRuntime = NonNullable; + // Worker initialization is expensive, so we prefer fewer threads unless there are many files const TASKS_PER_THREAD = 100; @@ -23,19 +25,22 @@ export const getWorkerThreadCount = (numOfTasks: number): { minThreads: number; }; }; -export const createWorkerPool = (numOfTasks: number, workerPath: string): Tinypool => { +export const createWorkerPool = ( + numOfTasks: number, + workerPath: string, + runtime: WorkerRuntime = 'child_process', +): Tinypool => { const { minThreads, maxThreads } = getWorkerThreadCount(numOfTasks); logger.trace( - `Initializing worker pool with min=${minThreads}, max=${maxThreads} threads. Worker path: ${workerPath}`, + `Initializing worker pool with min=${minThreads}, max=${maxThreads} threads, runtime=${runtime}. Worker path: ${workerPath}`, ); const startTime = process.hrtime.bigint(); const pool = new Tinypool({ filename: workerPath, - // Use child_process for better memory management - runtime: 'child_process', + runtime, minThreads, maxThreads, idleTimeout: 5000, @@ -78,8 +83,12 @@ export interface TaskRunner { cleanup: () => Promise; } -export const initTaskRunner = (numOfTasks: number, workerPath: string): TaskRunner => { - const pool = createWorkerPool(numOfTasks, workerPath); +export const initTaskRunner = ( + numOfTasks: number, + workerPath: string, + runtime: WorkerRuntime = 'child_process', +): TaskRunner => { + const pool = createWorkerPool(numOfTasks, workerPath, runtime); return { run: (task: T) => pool.run(task), cleanup: () => cleanupWorkerPool(pool), diff --git a/tests/core/file/fileCollect.test.ts b/tests/core/file/fileCollect.test.ts index c50d9744c..c1edb9cf9 100644 --- a/tests/core/file/fileCollect.test.ts +++ b/tests/core/file/fileCollect.test.ts @@ -9,6 +9,7 @@ import { collectFiles } from '../../../src/core/file/fileCollect.js'; import type { FileCollectTask } from '../../../src/core/file/workers/fileCollectWorker.js'; import fileCollectWorker from '../../../src/core/file/workers/fileCollectWorker.js'; import { logger } from '../../../src/shared/logger.js'; +import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; import { createMockConfig } from '../../testing/testUtils.js'; // Define the max file size constant for tests @@ -20,7 +21,21 @@ vi.mock('jschardet'); vi.mock('iconv-lite'); vi.mock('../../../src/shared/logger'); -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string) => { +interface MockInitTaskRunner { + ( + numOfTasks: number, + workerPath: string, + runtime?: WorkerRuntime, + ): { + run: (task: T) => Promise; + cleanup: () => Promise; + }; + lastRuntime?: WorkerRuntime; +} + +const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, runtime?: WorkerRuntime) => { + // Store runtime for verification in tests + (mockInitTaskRunner as MockInitTaskRunner).lastRuntime = runtime; return { run: async (task: T) => { return (await fileCollectWorker(task as FileCollectTask)) as R; @@ -195,4 +210,21 @@ describe('fileCollect', () => { expect.any(Error), ); }); + + it('should use worker_threads runtime when calling initTaskRunner', async () => { + const mockFilePaths = ['test.txt']; + const mockRootDir = '/root'; + const mockConfig = createMockConfig(); + + vi.mocked(isBinary).mockReturnValue(false); + vi.mocked(fs.readFile).mockResolvedValue(Buffer.from('file content')); + vi.mocked(jschardet.detect).mockReturnValue({ encoding: 'utf-8', confidence: 0.99 }); + vi.mocked(iconv.decode).mockReturnValue('decoded content'); + + await collectFiles(mockFilePaths, mockRootDir, mockConfig, () => {}, { + initTaskRunner: mockInitTaskRunner, + }); + + expect((mockInitTaskRunner as MockInitTaskRunner).lastRuntime).toBe('worker_threads'); + }); }); diff --git a/tests/core/file/fileProcess.test.ts b/tests/core/file/fileProcess.test.ts index 1237b107c..e5c0f9238 100644 --- a/tests/core/file/fileProcess.test.ts +++ b/tests/core/file/fileProcess.test.ts @@ -5,6 +5,7 @@ import { processContent } from '../../../src/core/file/fileProcessContent.js'; import type { RawFile } from '../../../src/core/file/fileTypes.js'; import type { FileProcessTask } from '../../../src/core/file/workers/fileProcessWorker.js'; import fileProcessWorker from '../../../src/core/file/workers/fileProcessWorker.js'; +import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; import { createMockConfig } from '../../testing/testUtils.js'; const createMockFileManipulator = (): FileManipulator => ({ @@ -19,7 +20,7 @@ const mockGetFileManipulator = (filePath: string): FileManipulator | null => { return null; }; -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string) => { +const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { return { run: async (task: T) => { return (await fileProcessWorker(task as FileProcessTask)) as R; diff --git a/tests/core/metrics/calculateOutputMetrics.test.ts b/tests/core/metrics/calculateOutputMetrics.test.ts index 4844a54c8..d44fd0697 100644 --- a/tests/core/metrics/calculateOutputMetrics.test.ts +++ b/tests/core/metrics/calculateOutputMetrics.test.ts @@ -3,10 +3,11 @@ import { calculateOutputMetrics } from '../../../src/core/metrics/calculateOutpu import type { OutputMetricsTask } from '../../../src/core/metrics/workers/outputMetricsWorker.js'; import outputMetricsWorker from '../../../src/core/metrics/workers/outputMetricsWorker.js'; import { logger } from '../../../src/shared/logger.js'; +import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; vi.mock('../../../src/shared/logger'); -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string) => { +const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { return { run: async (task: T) => { return (await outputMetricsWorker(task as OutputMetricsTask)) as R; @@ -46,7 +47,7 @@ describe('calculateOutputMetrics', () => { const encoding = 'o200k_base'; const mockError = new Error('Worker error'); - const mockErrorTaskRunner = (_numOfTasks: number, _workerPath: string) => { + const mockErrorTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { return { run: async (_task: T) => { throw mockError; @@ -96,7 +97,7 @@ describe('calculateOutputMetrics', () => { const path = 'large-file.txt'; let chunksProcessed = 0; - const mockParallelTaskRunner = (_numOfTasks: number, _workerPath: string) => { + const mockParallelTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { return { run: async (_task: T) => { chunksProcessed++; @@ -122,7 +123,7 @@ describe('calculateOutputMetrics', () => { const encoding = 'o200k_base'; const mockError = new Error('Parallel processing error'); - const mockErrorTaskRunner = (_numOfTasks: number, _workerPath: string) => { + const mockErrorTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { return { run: async (_task: T) => { throw mockError; @@ -147,7 +148,7 @@ describe('calculateOutputMetrics', () => { const encoding = 'o200k_base'; const processedChunks: string[] = []; - const mockChunkTrackingTaskRunner = (_numOfTasks: number, _workerPath: string) => { + const mockChunkTrackingTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { return { run: async (task: T) => { const outputTask = task as OutputMetricsTask; diff --git a/tests/core/metrics/calculateSelectiveFileMetrics.test.ts b/tests/core/metrics/calculateSelectiveFileMetrics.test.ts index 30b9b5dd9..462646f76 100644 --- a/tests/core/metrics/calculateSelectiveFileMetrics.test.ts +++ b/tests/core/metrics/calculateSelectiveFileMetrics.test.ts @@ -3,13 +3,14 @@ import type { ProcessedFile } from '../../../src/core/file/fileTypes.js'; import { calculateSelectiveFileMetrics } from '../../../src/core/metrics/calculateSelectiveFileMetrics.js'; import type { FileMetricsTask } from '../../../src/core/metrics/workers/fileMetricsWorker.js'; import fileMetricsWorker from '../../../src/core/metrics/workers/fileMetricsWorker.js'; +import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; import type { RepomixProgressCallback } from '../../../src/shared/types.js'; vi.mock('../../shared/processConcurrency', () => ({ getProcessConcurrency: () => 1, })); -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string) => { +const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { return { run: async (task: T) => { return (await fileMetricsWorker(task as FileMetricsTask)) as R; diff --git a/tests/core/security/securityCheck.test.ts b/tests/core/security/securityCheck.test.ts index bfe33e21c..1f7a1cc5e 100644 --- a/tests/core/security/securityCheck.test.ts +++ b/tests/core/security/securityCheck.test.ts @@ -9,6 +9,7 @@ import type { SecurityCheckTask } from '../../../src/core/security/workers/secur import securityCheckWorker from '../../../src/core/security/workers/securityCheckWorker.js'; import { logger } from '../../../src/shared/logger.js'; import { repomixLogLevels } from '../../../src/shared/logger.js'; +import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; vi.mock('../../../src/shared/logger'); vi.mock('../../../src/shared/processConcurrency', () => ({ @@ -39,7 +40,7 @@ const mockFiles: RawFile[] = [ }, ]; -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string) => { +const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { return { run: async (task: T) => { return (await securityCheckWorker(task as SecurityCheckTask)) as R; @@ -78,7 +79,7 @@ describe('runSecurityCheck', () => { it('should handle worker errors gracefully', async () => { const mockError = new Error('Worker error'); - const mockErrorTaskRunner = () => { + const mockErrorTaskRunner = (_numOfTasks?: number, _workerPath?: string, _runtime?: WorkerRuntime) => { return { run: async () => { throw mockError; diff --git a/tests/integration-tests/packager.test.ts b/tests/integration-tests/packager.test.ts index 41a2ff079..2058a8d4a 100644 --- a/tests/integration-tests/packager.test.ts +++ b/tests/integration-tests/packager.test.ts @@ -24,13 +24,14 @@ import { copyToClipboardIfEnabled } from '../../src/core/packager/copyToClipboar import { writeOutputToDisk } from '../../src/core/packager/writeOutputToDisk.js'; import { filterOutUntrustedFiles } from '../../src/core/security/filterOutUntrustedFiles.js'; import { validateFileSafety } from '../../src/core/security/validateFileSafety.js'; +import type { WorkerRuntime } from '../../src/shared/processConcurrency.js'; import { isWindows } from '../testing/testUtils.js'; const fixturesDir = path.join(__dirname, 'fixtures', 'packager'); const inputsDir = path.join(fixturesDir, 'inputs'); const outputsDir = path.join(fixturesDir, 'outputs'); -const mockCollectFileInitTaskRunner = (_numOfTasks: number, _workerPath: string) => { +const mockCollectFileInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { return { run: async (task: T) => { return (await fileCollectWorker(task as FileCollectTask)) as R; diff --git a/tests/shared/processConcurrency.test.ts b/tests/shared/processConcurrency.test.ts index 6970c0b8f..888473ce8 100644 --- a/tests/shared/processConcurrency.test.ts +++ b/tests/shared/processConcurrency.test.ts @@ -87,6 +87,23 @@ describe('processConcurrency', () => { }); expect(tinypool).toBeDefined(); }); + + it('should initialize Tinypool with worker_threads runtime when specified', () => { + const workerPath = '/path/to/worker.js'; + const tinypool = createWorkerPool(500, workerPath, 'worker_threads'); + + expect(Tinypool).toHaveBeenCalledWith({ + filename: workerPath, + runtime: 'worker_threads', + minThreads: 1, + maxThreads: 4, // Math.min(4, 500/100) = 4 + idleTimeout: 5000, + workerData: { + logLevel: 2, + }, + }); + expect(tinypool).toBeDefined(); + }); }); describe('initTaskRunner', () => { @@ -110,5 +127,18 @@ describe('processConcurrency', () => { expect(typeof taskRunner.run).toBe('function'); expect(typeof taskRunner.cleanup).toBe('function'); }); + + it('should pass runtime parameter to createWorkerPool', () => { + const workerPath = '/path/to/worker.js'; + const taskRunner = initTaskRunner(100, workerPath, 'worker_threads'); + + expect(Tinypool).toHaveBeenCalledWith( + expect.objectContaining({ + runtime: 'worker_threads', + }), + ); + expect(taskRunner).toHaveProperty('run'); + expect(taskRunner).toHaveProperty('cleanup'); + }); }); }); From 25d65dfe7c76badaca9459c3ebd78856c09ae2a8 Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Sun, 31 Aug 2025 16:24:38 +0900 Subject: [PATCH 2/5] refactor(core): Consolidate worker pool arguments into WorkerOptions interface - Add WorkerOptions interface to combine numOfTasks, workerPath, and optional runtime - Update createWorkerPool and initTaskRunner functions to accept WorkerOptions object - Refactor all usage sites across file processing, metrics, and security modules - Update corresponding test cases to use new interface This improves type safety and makes the API more maintainable by avoiding parameter order mistakes. --- src/core/file/fileCollect.ts | 10 ++++----- src/core/file/fileProcess.ts | 8 +++---- src/core/file/globbyExecute.ts | 8 +++---- src/core/metrics/calculateGitDiffMetrics.ts | 8 +++---- src/core/metrics/calculateGitLogMetrics.ts | 8 +++---- src/core/metrics/calculateOutputMetrics.ts | 6 +++--- .../metrics/calculateSelectiveFileMetrics.ts | 8 +++---- src/core/security/securityCheck.ts | 8 +++---- src/shared/processConcurrency.ts | 21 +++++++++---------- tests/shared/processConcurrency.test.ts | 8 +++---- 10 files changed, 46 insertions(+), 47 deletions(-) diff --git a/src/core/file/fileCollect.ts b/src/core/file/fileCollect.ts index bda4cb2cc..e5f1c2894 100644 --- a/src/core/file/fileCollect.ts +++ b/src/core/file/fileCollect.ts @@ -23,11 +23,11 @@ export const collectFiles = async ( initTaskRunner, }, ): Promise => { - const taskRunner = deps.initTaskRunner( - filePaths.length, - new URL('./workers/fileCollectWorker.js', import.meta.url).href, - 'worker_threads', - ); + const taskRunner = deps.initTaskRunner({ + numOfTasks: filePaths.length, + workerPath: new URL('./workers/fileCollectWorker.js', import.meta.url).href, + runtime: 'worker_threads', + }); const tasks = filePaths.map( (filePath) => ({ diff --git a/src/core/file/fileProcess.ts b/src/core/file/fileProcess.ts index bc35fdda5..d113f9d96 100644 --- a/src/core/file/fileProcess.ts +++ b/src/core/file/fileProcess.ts @@ -21,10 +21,10 @@ export const processFiles = async ( getFileManipulator, }, ): Promise => { - const taskRunner = deps.initTaskRunner( - rawFiles.length, - new URL('./workers/fileProcessWorker.js', import.meta.url).href, - ); + const taskRunner = deps.initTaskRunner({ + numOfTasks: rawFiles.length, + workerPath: new URL('./workers/fileProcessWorker.js', import.meta.url).href, + }); const tasks = rawFiles.map( (rawFile, _index) => ({ diff --git a/src/core/file/globbyExecute.ts b/src/core/file/globbyExecute.ts index 48e74f18d..1a424cd6d 100644 --- a/src/core/file/globbyExecute.ts +++ b/src/core/file/globbyExecute.ts @@ -13,10 +13,10 @@ export const executeGlobbyInWorker = async ( initTaskRunner, }, ): Promise => { - const taskRunner = deps.initTaskRunner( - 1, - new URL('./workers/globbyWorker.js', import.meta.url).href, - ); + const taskRunner = deps.initTaskRunner({ + numOfTasks: 1, + workerPath: new URL('./workers/globbyWorker.js', import.meta.url).href, + }); try { logger.trace('Starting globby in worker for memory isolation'); diff --git a/src/core/metrics/calculateGitDiffMetrics.ts b/src/core/metrics/calculateGitDiffMetrics.ts index 994c72859..ff67aa2d2 100644 --- a/src/core/metrics/calculateGitDiffMetrics.ts +++ b/src/core/metrics/calculateGitDiffMetrics.ts @@ -23,10 +23,10 @@ export const calculateGitDiffMetrics = async ( return 0; } - const taskRunner = deps.initTaskRunner( - 1, // Single task for git diff calculation - new URL('./workers/gitDiffMetricsWorker.js', import.meta.url).href, - ); + const taskRunner = deps.initTaskRunner({ + numOfTasks: 1, // Single task for git diff calculation + workerPath: new URL('./workers/gitDiffMetricsWorker.js', import.meta.url).href, + }); try { const startTime = process.hrtime.bigint(); diff --git a/src/core/metrics/calculateGitLogMetrics.ts b/src/core/metrics/calculateGitLogMetrics.ts index 6eebdaff1..11e5bd253 100644 --- a/src/core/metrics/calculateGitLogMetrics.ts +++ b/src/core/metrics/calculateGitLogMetrics.ts @@ -28,10 +28,10 @@ export const calculateGitLogMetrics = async ( }; } - const taskRunner = deps.initTaskRunner( - 1, // Single task for git log calculation - new URL('./workers/gitLogMetricsWorker.js', import.meta.url).href, - ); + const taskRunner = deps.initTaskRunner({ + numOfTasks: 1, // Single task for git log calculation + workerPath: new URL('./workers/gitLogMetricsWorker.js', import.meta.url).href, + }); try { const startTime = process.hrtime.bigint(); diff --git a/src/core/metrics/calculateOutputMetrics.ts b/src/core/metrics/calculateOutputMetrics.ts index 04bbf8930..16ae0bc9f 100644 --- a/src/core/metrics/calculateOutputMetrics.ts +++ b/src/core/metrics/calculateOutputMetrics.ts @@ -16,10 +16,10 @@ export const calculateOutputMetrics = async ( ): Promise => { const shouldRunInParallel = content.length > MIN_CONTENT_LENGTH_FOR_PARALLEL; const numOfTasks = shouldRunInParallel ? CHUNK_SIZE : 1; - const taskRunner = deps.initTaskRunner( + const taskRunner = deps.initTaskRunner({ numOfTasks, - new URL('./workers/outputMetricsWorker.js', import.meta.url).href, - ); + workerPath: new URL('./workers/outputMetricsWorker.js', import.meta.url).href, + }); try { logger.trace(`Starting output token count for ${path || 'output'}`); diff --git a/src/core/metrics/calculateSelectiveFileMetrics.ts b/src/core/metrics/calculateSelectiveFileMetrics.ts index 3c7f4387e..65f928cab 100644 --- a/src/core/metrics/calculateSelectiveFileMetrics.ts +++ b/src/core/metrics/calculateSelectiveFileMetrics.ts @@ -23,10 +23,10 @@ export const calculateSelectiveFileMetrics = async ( return []; } - const taskRunner = deps.initTaskRunner( - filesToProcess.length, - new URL('./workers/fileMetricsWorker.js', import.meta.url).href, - ); + const taskRunner = deps.initTaskRunner({ + numOfTasks: filesToProcess.length, + workerPath: new URL('./workers/fileMetricsWorker.js', import.meta.url).href, + }); const tasks = filesToProcess.map( (file, index) => ({ diff --git a/src/core/security/securityCheck.ts b/src/core/security/securityCheck.ts index 9e572ee25..47439c787 100644 --- a/src/core/security/securityCheck.ts +++ b/src/core/security/securityCheck.ts @@ -55,10 +55,10 @@ export const runSecurityCheck = async ( } } - const taskRunner = deps.initTaskRunner( - rawFiles.length + gitDiffTasks.length + gitLogTasks.length, - new URL('./workers/securityCheckWorker.js', import.meta.url).href, - ); + const taskRunner = deps.initTaskRunner({ + numOfTasks: rawFiles.length + gitDiffTasks.length + gitLogTasks.length, + workerPath: new URL('./workers/securityCheckWorker.js', import.meta.url).href, + }); const fileTasks = rawFiles.map( (file) => ({ diff --git a/src/shared/processConcurrency.ts b/src/shared/processConcurrency.ts index c4884fee7..ba2908159 100644 --- a/src/shared/processConcurrency.ts +++ b/src/shared/processConcurrency.ts @@ -4,6 +4,12 @@ import { logger } from './logger.js'; export type WorkerRuntime = NonNullable; +export interface WorkerOptions { + numOfTasks: number; + workerPath: string; + runtime?: WorkerRuntime; +} + // Worker initialization is expensive, so we prefer fewer threads unless there are many files const TASKS_PER_THREAD = 100; @@ -25,11 +31,8 @@ export const getWorkerThreadCount = (numOfTasks: number): { minThreads: number; }; }; -export const createWorkerPool = ( - numOfTasks: number, - workerPath: string, - runtime: WorkerRuntime = 'child_process', -): Tinypool => { +export const createWorkerPool = (options: WorkerOptions): Tinypool => { + const { numOfTasks, workerPath, runtime = 'child_process' } = options; const { minThreads, maxThreads } = getWorkerThreadCount(numOfTasks); logger.trace( @@ -83,12 +86,8 @@ export interface TaskRunner { cleanup: () => Promise; } -export const initTaskRunner = ( - numOfTasks: number, - workerPath: string, - runtime: WorkerRuntime = 'child_process', -): TaskRunner => { - const pool = createWorkerPool(numOfTasks, workerPath, runtime); +export const initTaskRunner = (options: WorkerOptions): TaskRunner => { + const pool = createWorkerPool(options); return { run: (task: T) => pool.run(task), cleanup: () => cleanupWorkerPool(pool), diff --git a/tests/shared/processConcurrency.test.ts b/tests/shared/processConcurrency.test.ts index 888473ce8..d86fef201 100644 --- a/tests/shared/processConcurrency.test.ts +++ b/tests/shared/processConcurrency.test.ts @@ -73,7 +73,7 @@ describe('processConcurrency', () => { it('should initialize Tinypool with correct configuration', () => { const workerPath = '/path/to/worker.js'; - const tinypool = createWorkerPool(500, workerPath); + const tinypool = createWorkerPool({ numOfTasks: 500, workerPath }); expect(Tinypool).toHaveBeenCalledWith({ filename: workerPath, @@ -90,7 +90,7 @@ describe('processConcurrency', () => { it('should initialize Tinypool with worker_threads runtime when specified', () => { const workerPath = '/path/to/worker.js'; - const tinypool = createWorkerPool(500, workerPath, 'worker_threads'); + const tinypool = createWorkerPool({ numOfTasks: 500, workerPath, runtime: 'worker_threads' }); expect(Tinypool).toHaveBeenCalledWith({ filename: workerPath, @@ -120,7 +120,7 @@ describe('processConcurrency', () => { it('should return a TaskRunner with run and cleanup methods', () => { const workerPath = '/path/to/worker.js'; - const taskRunner = initTaskRunner(100, workerPath); + const taskRunner = initTaskRunner({ numOfTasks: 100, workerPath }); expect(taskRunner).toHaveProperty('run'); expect(taskRunner).toHaveProperty('cleanup'); @@ -130,7 +130,7 @@ describe('processConcurrency', () => { it('should pass runtime parameter to createWorkerPool', () => { const workerPath = '/path/to/worker.js'; - const taskRunner = initTaskRunner(100, workerPath, 'worker_threads'); + const taskRunner = initTaskRunner({ numOfTasks: 100, workerPath, runtime: 'worker_threads' }); expect(Tinypool).toHaveBeenCalledWith( expect.objectContaining({ From ddd2814f8439f0b2c2e6280011b6e66fad579991 Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Sun, 31 Aug 2025 16:32:49 +0900 Subject: [PATCH 3/5] fix(tests): Update test mocks to use new WorkerOptions interface --- src/shared/processConcurrency.ts | 2 +- tests/cli/cliReport.binaryFiles.test.ts | 1 - tests/core/file/fileCollect.test.ts | 10 ++++------ tests/core/file/fileProcess.test.ts | 4 ++-- tests/core/metrics/calculateOutputMetrics.test.ts | 12 ++++++------ .../metrics/calculateSelectiveFileMetrics.test.ts | 4 ++-- tests/core/security/securityCheck.test.ts | 6 +++--- tests/integration-tests/packager.test.ts | 4 ++-- 8 files changed, 20 insertions(+), 23 deletions(-) diff --git a/src/shared/processConcurrency.ts b/src/shared/processConcurrency.ts index ba2908159..3c84a269f 100644 --- a/src/shared/processConcurrency.ts +++ b/src/shared/processConcurrency.ts @@ -1,5 +1,5 @@ import os from 'node:os'; -import { Options, Tinypool } from 'tinypool'; +import { type Options, Tinypool } from 'tinypool'; import { logger } from './logger.js'; export type WorkerRuntime = NonNullable; diff --git a/tests/cli/cliReport.binaryFiles.test.ts b/tests/cli/cliReport.binaryFiles.test.ts index 207ee8034..890e65f50 100644 --- a/tests/cli/cliReport.binaryFiles.test.ts +++ b/tests/cli/cliReport.binaryFiles.test.ts @@ -1,4 +1,3 @@ -import path from 'node:path'; import { beforeEach, describe, expect, test, vi } from 'vitest'; import { reportSkippedFiles } from '../../src/cli/cliReport.js'; import type { SkippedFileInfo } from '../../src/core/file/fileCollect.js'; diff --git a/tests/core/file/fileCollect.test.ts b/tests/core/file/fileCollect.test.ts index c1edb9cf9..356df6d8d 100644 --- a/tests/core/file/fileCollect.test.ts +++ b/tests/core/file/fileCollect.test.ts @@ -9,7 +9,7 @@ import { collectFiles } from '../../../src/core/file/fileCollect.js'; import type { FileCollectTask } from '../../../src/core/file/workers/fileCollectWorker.js'; import fileCollectWorker from '../../../src/core/file/workers/fileCollectWorker.js'; import { logger } from '../../../src/shared/logger.js'; -import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; +import type { WorkerOptions, WorkerRuntime } from '../../../src/shared/processConcurrency.js'; import { createMockConfig } from '../../testing/testUtils.js'; // Define the max file size constant for tests @@ -23,9 +23,7 @@ vi.mock('../../../src/shared/logger'); interface MockInitTaskRunner { ( - numOfTasks: number, - workerPath: string, - runtime?: WorkerRuntime, + options: WorkerOptions, ): { run: (task: T) => Promise; cleanup: () => Promise; @@ -33,9 +31,9 @@ interface MockInitTaskRunner { lastRuntime?: WorkerRuntime; } -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, runtime?: WorkerRuntime) => { +const mockInitTaskRunner = (options: WorkerOptions) => { // Store runtime for verification in tests - (mockInitTaskRunner as MockInitTaskRunner).lastRuntime = runtime; + (mockInitTaskRunner as MockInitTaskRunner).lastRuntime = options.runtime; return { run: async (task: T) => { return (await fileCollectWorker(task as FileCollectTask)) as R; diff --git a/tests/core/file/fileProcess.test.ts b/tests/core/file/fileProcess.test.ts index e5c0f9238..f21baf5b7 100644 --- a/tests/core/file/fileProcess.test.ts +++ b/tests/core/file/fileProcess.test.ts @@ -5,7 +5,7 @@ import { processContent } from '../../../src/core/file/fileProcessContent.js'; import type { RawFile } from '../../../src/core/file/fileTypes.js'; import type { FileProcessTask } from '../../../src/core/file/workers/fileProcessWorker.js'; import fileProcessWorker from '../../../src/core/file/workers/fileProcessWorker.js'; -import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; +import type { WorkerOptions } from '../../../src/shared/processConcurrency.js'; import { createMockConfig } from '../../testing/testUtils.js'; const createMockFileManipulator = (): FileManipulator => ({ @@ -20,7 +20,7 @@ const mockGetFileManipulator = (filePath: string): FileManipulator | null => { return null; }; -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { +const mockInitTaskRunner = (_options: WorkerOptions) => { return { run: async (task: T) => { return (await fileProcessWorker(task as FileProcessTask)) as R; diff --git a/tests/core/metrics/calculateOutputMetrics.test.ts b/tests/core/metrics/calculateOutputMetrics.test.ts index d44fd0697..aec40554d 100644 --- a/tests/core/metrics/calculateOutputMetrics.test.ts +++ b/tests/core/metrics/calculateOutputMetrics.test.ts @@ -3,11 +3,11 @@ import { calculateOutputMetrics } from '../../../src/core/metrics/calculateOutpu import type { OutputMetricsTask } from '../../../src/core/metrics/workers/outputMetricsWorker.js'; import outputMetricsWorker from '../../../src/core/metrics/workers/outputMetricsWorker.js'; import { logger } from '../../../src/shared/logger.js'; -import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; +import type { WorkerOptions } from '../../../src/shared/processConcurrency.js'; vi.mock('../../../src/shared/logger'); -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { +const mockInitTaskRunner = (_options: WorkerOptions) => { return { run: async (task: T) => { return (await outputMetricsWorker(task as OutputMetricsTask)) as R; @@ -47,7 +47,7 @@ describe('calculateOutputMetrics', () => { const encoding = 'o200k_base'; const mockError = new Error('Worker error'); - const mockErrorTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { + const mockErrorTaskRunner = (_options: WorkerOptions) => { return { run: async (_task: T) => { throw mockError; @@ -97,7 +97,7 @@ describe('calculateOutputMetrics', () => { const path = 'large-file.txt'; let chunksProcessed = 0; - const mockParallelTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { + const mockParallelTaskRunner = (_options: WorkerOptions) => { return { run: async (_task: T) => { chunksProcessed++; @@ -123,7 +123,7 @@ describe('calculateOutputMetrics', () => { const encoding = 'o200k_base'; const mockError = new Error('Parallel processing error'); - const mockErrorTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { + const mockErrorTaskRunner = (_options: WorkerOptions) => { return { run: async (_task: T) => { throw mockError; @@ -148,7 +148,7 @@ describe('calculateOutputMetrics', () => { const encoding = 'o200k_base'; const processedChunks: string[] = []; - const mockChunkTrackingTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { + const mockChunkTrackingTaskRunner = (_options: WorkerOptions) => { return { run: async (task: T) => { const outputTask = task as OutputMetricsTask; diff --git a/tests/core/metrics/calculateSelectiveFileMetrics.test.ts b/tests/core/metrics/calculateSelectiveFileMetrics.test.ts index 462646f76..2e8b06e22 100644 --- a/tests/core/metrics/calculateSelectiveFileMetrics.test.ts +++ b/tests/core/metrics/calculateSelectiveFileMetrics.test.ts @@ -3,14 +3,14 @@ import type { ProcessedFile } from '../../../src/core/file/fileTypes.js'; import { calculateSelectiveFileMetrics } from '../../../src/core/metrics/calculateSelectiveFileMetrics.js'; import type { FileMetricsTask } from '../../../src/core/metrics/workers/fileMetricsWorker.js'; import fileMetricsWorker from '../../../src/core/metrics/workers/fileMetricsWorker.js'; -import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; +import type { WorkerOptions } from '../../../src/shared/processConcurrency.js'; import type { RepomixProgressCallback } from '../../../src/shared/types.js'; vi.mock('../../shared/processConcurrency', () => ({ getProcessConcurrency: () => 1, })); -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { +const mockInitTaskRunner = (_options: WorkerOptions) => { return { run: async (task: T) => { return (await fileMetricsWorker(task as FileMetricsTask)) as R; diff --git a/tests/core/security/securityCheck.test.ts b/tests/core/security/securityCheck.test.ts index 1f7a1cc5e..be010854b 100644 --- a/tests/core/security/securityCheck.test.ts +++ b/tests/core/security/securityCheck.test.ts @@ -9,7 +9,7 @@ import type { SecurityCheckTask } from '../../../src/core/security/workers/secur import securityCheckWorker from '../../../src/core/security/workers/securityCheckWorker.js'; import { logger } from '../../../src/shared/logger.js'; import { repomixLogLevels } from '../../../src/shared/logger.js'; -import type { WorkerRuntime } from '../../../src/shared/processConcurrency.js'; +import type { WorkerOptions } from '../../../src/shared/processConcurrency.js'; vi.mock('../../../src/shared/logger'); vi.mock('../../../src/shared/processConcurrency', () => ({ @@ -40,7 +40,7 @@ const mockFiles: RawFile[] = [ }, ]; -const mockInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { +const mockInitTaskRunner = (_options: WorkerOptions) => { return { run: async (task: T) => { return (await securityCheckWorker(task as SecurityCheckTask)) as R; @@ -79,7 +79,7 @@ describe('runSecurityCheck', () => { it('should handle worker errors gracefully', async () => { const mockError = new Error('Worker error'); - const mockErrorTaskRunner = (_numOfTasks?: number, _workerPath?: string, _runtime?: WorkerRuntime) => { + const mockErrorTaskRunner = (_options?: WorkerOptions) => { return { run: async () => { throw mockError; diff --git a/tests/integration-tests/packager.test.ts b/tests/integration-tests/packager.test.ts index 2058a8d4a..f1f828f92 100644 --- a/tests/integration-tests/packager.test.ts +++ b/tests/integration-tests/packager.test.ts @@ -24,14 +24,14 @@ import { copyToClipboardIfEnabled } from '../../src/core/packager/copyToClipboar import { writeOutputToDisk } from '../../src/core/packager/writeOutputToDisk.js'; import { filterOutUntrustedFiles } from '../../src/core/security/filterOutUntrustedFiles.js'; import { validateFileSafety } from '../../src/core/security/validateFileSafety.js'; -import type { WorkerRuntime } from '../../src/shared/processConcurrency.js'; +import type { WorkerOptions } from '../../src/shared/processConcurrency.js'; import { isWindows } from '../testing/testUtils.js'; const fixturesDir = path.join(__dirname, 'fixtures', 'packager'); const inputsDir = path.join(fixturesDir, 'inputs'); const outputsDir = path.join(fixturesDir, 'outputs'); -const mockCollectFileInitTaskRunner = (_numOfTasks: number, _workerPath: string, _runtime?: WorkerRuntime) => { +const mockCollectFileInitTaskRunner = (_options: WorkerOptions) => { return { run: async (task: T) => { return (await fileCollectWorker(task as FileCollectTask)) as R; From a6f0107e41684425744f308547c0f76955624d06 Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Sun, 31 Aug 2025 16:39:54 +0900 Subject: [PATCH 4/5] docs(core): Add comment explaining worker_threads usage in file collection --- src/core/file/fileCollect.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/file/fileCollect.ts b/src/core/file/fileCollect.ts index e5f1c2894..8c90644e6 100644 --- a/src/core/file/fileCollect.ts +++ b/src/core/file/fileCollect.ts @@ -26,7 +26,8 @@ export const collectFiles = async ( const taskRunner = deps.initTaskRunner({ numOfTasks: filePaths.length, workerPath: new URL('./workers/fileCollectWorker.js', import.meta.url).href, - runtime: 'worker_threads', + // Use worker_threads for file collection - low memory leak risk + runtime: 'worker_threads' }); const tasks = filePaths.map( (filePath) => From 4d9b52cc91770787d29469fdaed6bb3493f83129 Mon Sep 17 00:00:00 2001 From: Kazuki Yamada Date: Sun, 31 Aug 2025 16:49:46 +0900 Subject: [PATCH 5/5] style(core): Fix trailing comma in file collection worker options --- src/core/file/fileCollect.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/file/fileCollect.ts b/src/core/file/fileCollect.ts index 8c90644e6..e22668bf5 100644 --- a/src/core/file/fileCollect.ts +++ b/src/core/file/fileCollect.ts @@ -27,7 +27,7 @@ export const collectFiles = async ( numOfTasks: filePaths.length, workerPath: new URL('./workers/fileCollectWorker.js', import.meta.url).href, // Use worker_threads for file collection - low memory leak risk - runtime: 'worker_threads' + runtime: 'worker_threads', }); const tasks = filePaths.map( (filePath) =>