Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/memory/src/memory-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const flags = {

// Extract numeric arguments
const numericArgs = args.filter((arg) => !arg.startsWith('-') && !Number.isNaN(Number(arg)));
const iterations = Number(numericArgs[0]) || (flags.full ? 200 : 50);
const iterations = Number(numericArgs[0]) || (flags.full ? 200 : 100);
Comment thread
yamadashy marked this conversation as resolved.
const delay = Number(numericArgs[1]) || (flags.full ? 100 : 50);

Comment thread
yamadashy marked this conversation as resolved.
// Configuration
Expand Down
10 changes: 6 additions & 4 deletions src/core/file/fileCollect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ export const collectFiles = async (
initTaskRunner,
},
): Promise<FileCollectResults> => {
const taskRunner = deps.initTaskRunner<FileCollectTask, FileCollectResult>(
filePaths.length,
new URL('./workers/fileCollectWorker.js', import.meta.url).href,
);
const taskRunner = deps.initTaskRunner<FileCollectTask, FileCollectResult>({
numOfTasks: filePaths.length,
workerPath: new URL('./workers/fileCollectWorker.js', import.meta.url).href,
// Use worker_threads for file collection - low memory leak risk
runtime: 'worker_threads',
});
const tasks = filePaths.map(
(filePath) =>
({
Expand Down
8 changes: 4 additions & 4 deletions src/core/file/fileProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ export const processFiles = async (
getFileManipulator,
},
): Promise<ProcessedFile[]> => {
const taskRunner = deps.initTaskRunner<FileProcessTask, ProcessedFile>(
rawFiles.length,
new URL('./workers/fileProcessWorker.js', import.meta.url).href,
);
const taskRunner = deps.initTaskRunner<FileProcessTask, ProcessedFile>({
numOfTasks: rawFiles.length,
workerPath: new URL('./workers/fileProcessWorker.js', import.meta.url).href,
});
const tasks = rawFiles.map(
(rawFile, _index) =>
({
Expand Down
8 changes: 4 additions & 4 deletions src/core/file/globbyExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ export const executeGlobbyInWorker = async (
initTaskRunner,
},
): Promise<string[]> => {
const taskRunner = deps.initTaskRunner<GlobbyTask, string[]>(
1,
new URL('./workers/globbyWorker.js', import.meta.url).href,
);
const taskRunner = deps.initTaskRunner<GlobbyTask, string[]>({
numOfTasks: 1,
workerPath: new URL('./workers/globbyWorker.js', import.meta.url).href,
});

try {
logger.trace('Starting globby in worker for memory isolation');
Expand Down
8 changes: 4 additions & 4 deletions src/core/metrics/calculateGitDiffMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ export const calculateGitDiffMetrics = async (
return 0;
}

const taskRunner = deps.initTaskRunner<GitDiffMetricsTask, number>(
1, // Single task for git diff calculation
new URL('./workers/gitDiffMetricsWorker.js', import.meta.url).href,
);
const taskRunner = deps.initTaskRunner<GitDiffMetricsTask, number>({
numOfTasks: 1, // Single task for git diff calculation
workerPath: new URL('./workers/gitDiffMetricsWorker.js', import.meta.url).href,
});

try {
const startTime = process.hrtime.bigint();
Expand Down
8 changes: 4 additions & 4 deletions src/core/metrics/calculateGitLogMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ export const calculateGitLogMetrics = async (
};
}

const taskRunner = deps.initTaskRunner<GitLogMetricsTask, number>(
1, // Single task for git log calculation
new URL('./workers/gitLogMetricsWorker.js', import.meta.url).href,
);
const taskRunner = deps.initTaskRunner<GitLogMetricsTask, number>({
numOfTasks: 1, // Single task for git log calculation
workerPath: new URL('./workers/gitLogMetricsWorker.js', import.meta.url).href,
});

try {
const startTime = process.hrtime.bigint();
Expand Down
6 changes: 3 additions & 3 deletions src/core/metrics/calculateOutputMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ export const calculateOutputMetrics = async (
): Promise<number> => {
const shouldRunInParallel = content.length > MIN_CONTENT_LENGTH_FOR_PARALLEL;
const numOfTasks = shouldRunInParallel ? CHUNK_SIZE : 1;
const taskRunner = deps.initTaskRunner<OutputMetricsTask, number>(
const taskRunner = deps.initTaskRunner<OutputMetricsTask, number>({
numOfTasks,
new URL('./workers/outputMetricsWorker.js', import.meta.url).href,
);
workerPath: new URL('./workers/outputMetricsWorker.js', import.meta.url).href,
});

try {
logger.trace(`Starting output token count for ${path || 'output'}`);
Expand Down
8 changes: 4 additions & 4 deletions src/core/metrics/calculateSelectiveFileMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ export const calculateSelectiveFileMetrics = async (
return [];
}

const taskRunner = deps.initTaskRunner<FileMetricsTask, FileMetrics>(
filesToProcess.length,
new URL('./workers/fileMetricsWorker.js', import.meta.url).href,
);
const taskRunner = deps.initTaskRunner<FileMetricsTask, FileMetrics>({
numOfTasks: filesToProcess.length,
workerPath: new URL('./workers/fileMetricsWorker.js', import.meta.url).href,
});
const tasks = filesToProcess.map(
(file, index) =>
({
Expand Down
8 changes: 4 additions & 4 deletions src/core/security/securityCheck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ export const runSecurityCheck = async (
}
}

const taskRunner = deps.initTaskRunner<SecurityCheckTask, SuspiciousFileResult | null>(
rawFiles.length + gitDiffTasks.length + gitLogTasks.length,
new URL('./workers/securityCheckWorker.js', import.meta.url).href,
);
const taskRunner = deps.initTaskRunner<SecurityCheckTask, SuspiciousFileResult | null>({
numOfTasks: rawFiles.length + gitDiffTasks.length + gitLogTasks.length,
workerPath: new URL('./workers/securityCheckWorker.js', import.meta.url).href,
});
const fileTasks = rawFiles.map(
(file) =>
({
Expand Down
22 changes: 15 additions & 7 deletions src/shared/processConcurrency.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import os from 'node:os';
import { Tinypool } from 'tinypool';
import { type Options, Tinypool } from 'tinypool';
import { logger } from './logger.js';

export type WorkerRuntime = NonNullable<Options['runtime']>;

export interface WorkerOptions {
numOfTasks: number;
workerPath: string;
runtime?: WorkerRuntime;
}

// Worker initialization is expensive, so we prefer fewer threads unless there are many files
const TASKS_PER_THREAD = 100;

Expand All @@ -23,19 +31,19 @@ export const getWorkerThreadCount = (numOfTasks: number): { minThreads: number;
};
};

export const createWorkerPool = (numOfTasks: number, workerPath: string): Tinypool => {
export const createWorkerPool = (options: WorkerOptions): Tinypool => {
const { numOfTasks, workerPath, runtime = 'child_process' } = options;
const { minThreads, maxThreads } = getWorkerThreadCount(numOfTasks);

logger.trace(
`Initializing worker pool with min=${minThreads}, max=${maxThreads} threads. Worker path: ${workerPath}`,
`Initializing worker pool with min=${minThreads}, max=${maxThreads} threads, runtime=${runtime}. Worker path: ${workerPath}`,
);

const startTime = process.hrtime.bigint();

const pool = new Tinypool({
filename: workerPath,
// Use child_process for better memory management
runtime: 'child_process',
runtime,
minThreads,
maxThreads,
idleTimeout: 5000,
Expand Down Expand Up @@ -78,8 +86,8 @@ export interface TaskRunner<T, R> {
cleanup: () => Promise<void>;
}

export const initTaskRunner = <T, R>(numOfTasks: number, workerPath: string): TaskRunner<T, R> => {
const pool = createWorkerPool(numOfTasks, workerPath);
export const initTaskRunner = <T, R>(options: WorkerOptions): TaskRunner<T, R> => {
const pool = createWorkerPool(options);
return {
run: (task: T) => pool.run(task),
cleanup: () => cleanupWorkerPool(pool),
Expand Down
1 change: 0 additions & 1 deletion tests/cli/cliReport.binaryFiles.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import path from 'node:path';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import { reportSkippedFiles } from '../../src/cli/cliReport.js';
import type { SkippedFileInfo } from '../../src/core/file/fileCollect.js';
Expand Down
32 changes: 31 additions & 1 deletion tests/core/file/fileCollect.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { collectFiles } from '../../../src/core/file/fileCollect.js';
import type { FileCollectTask } from '../../../src/core/file/workers/fileCollectWorker.js';
import fileCollectWorker from '../../../src/core/file/workers/fileCollectWorker.js';
import { logger } from '../../../src/shared/logger.js';
import type { WorkerOptions, WorkerRuntime } from '../../../src/shared/processConcurrency.js';
import { createMockConfig } from '../../testing/testUtils.js';

// Define the max file size constant for tests
Expand All @@ -20,7 +21,19 @@ vi.mock('jschardet');
vi.mock('iconv-lite');
vi.mock('../../../src/shared/logger');

const mockInitTaskRunner = <T, R>(_numOfTasks: number, _workerPath: string) => {
interface MockInitTaskRunner {
<T, R>(
options: WorkerOptions,
): {
run: (task: T) => Promise<R>;
cleanup: () => Promise<void>;
};
lastRuntime?: WorkerRuntime;
}

const mockInitTaskRunner = <T, R>(options: WorkerOptions) => {
// Store runtime for verification in tests
(mockInitTaskRunner as MockInitTaskRunner).lastRuntime = options.runtime;
return {
run: async (task: T) => {
return (await fileCollectWorker(task as FileCollectTask)) as R;
Expand Down Expand Up @@ -195,4 +208,21 @@ describe('fileCollect', () => {
expect.any(Error),
);
});

it('should use worker_threads runtime when calling initTaskRunner', async () => {
const mockFilePaths = ['test.txt'];
const mockRootDir = '/root';
const mockConfig = createMockConfig();

vi.mocked(isBinary).mockReturnValue(false);
vi.mocked(fs.readFile).mockResolvedValue(Buffer.from('file content'));
vi.mocked(jschardet.detect).mockReturnValue({ encoding: 'utf-8', confidence: 0.99 });
vi.mocked(iconv.decode).mockReturnValue('decoded content');

await collectFiles(mockFilePaths, mockRootDir, mockConfig, () => {}, {
initTaskRunner: mockInitTaskRunner,
});

expect((mockInitTaskRunner as MockInitTaskRunner).lastRuntime).toBe('worker_threads');
});
});
3 changes: 2 additions & 1 deletion tests/core/file/fileProcess.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { processContent } from '../../../src/core/file/fileProcessContent.js';
import type { RawFile } from '../../../src/core/file/fileTypes.js';
import type { FileProcessTask } from '../../../src/core/file/workers/fileProcessWorker.js';
import fileProcessWorker from '../../../src/core/file/workers/fileProcessWorker.js';
import type { WorkerOptions } from '../../../src/shared/processConcurrency.js';
import { createMockConfig } from '../../testing/testUtils.js';

const createMockFileManipulator = (): FileManipulator => ({
Expand All @@ -19,7 +20,7 @@ const mockGetFileManipulator = (filePath: string): FileManipulator | null => {
return null;
};

const mockInitTaskRunner = <T, R>(_numOfTasks: number, _workerPath: string) => {
const mockInitTaskRunner = <T, R>(_options: WorkerOptions) => {
return {
run: async (task: T) => {
return (await fileProcessWorker(task as FileProcessTask)) as R;
Expand Down
11 changes: 6 additions & 5 deletions tests/core/metrics/calculateOutputMetrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import { calculateOutputMetrics } from '../../../src/core/metrics/calculateOutpu
import type { OutputMetricsTask } from '../../../src/core/metrics/workers/outputMetricsWorker.js';
import outputMetricsWorker from '../../../src/core/metrics/workers/outputMetricsWorker.js';
import { logger } from '../../../src/shared/logger.js';
import type { WorkerOptions } from '../../../src/shared/processConcurrency.js';

vi.mock('../../../src/shared/logger');

const mockInitTaskRunner = <T, R>(_numOfTasks: number, _workerPath: string) => {
const mockInitTaskRunner = <T, R>(_options: WorkerOptions) => {
return {
run: async (task: T) => {
return (await outputMetricsWorker(task as OutputMetricsTask)) as R;
Expand Down Expand Up @@ -46,7 +47,7 @@ describe('calculateOutputMetrics', () => {
const encoding = 'o200k_base';
const mockError = new Error('Worker error');

const mockErrorTaskRunner = <T, _R>(_numOfTasks: number, _workerPath: string) => {
const mockErrorTaskRunner = <T, _R>(_options: WorkerOptions) => {
return {
run: async (_task: T) => {
throw mockError;
Expand Down Expand Up @@ -96,7 +97,7 @@ describe('calculateOutputMetrics', () => {
const path = 'large-file.txt';

let chunksProcessed = 0;
const mockParallelTaskRunner = <T, R>(_numOfTasks: number, _workerPath: string) => {
const mockParallelTaskRunner = <T, R>(_options: WorkerOptions) => {
return {
run: async (_task: T) => {
chunksProcessed++;
Expand All @@ -122,7 +123,7 @@ describe('calculateOutputMetrics', () => {
const encoding = 'o200k_base';
const mockError = new Error('Parallel processing error');

const mockErrorTaskRunner = <T, _R>(_numOfTasks: number, _workerPath: string) => {
const mockErrorTaskRunner = <T, _R>(_options: WorkerOptions) => {
return {
run: async (_task: T) => {
throw mockError;
Expand All @@ -147,7 +148,7 @@ describe('calculateOutputMetrics', () => {
const encoding = 'o200k_base';
const processedChunks: string[] = [];

const mockChunkTrackingTaskRunner = <T, R>(_numOfTasks: number, _workerPath: string) => {
const mockChunkTrackingTaskRunner = <T, R>(_options: WorkerOptions) => {
return {
run: async (task: T) => {
const outputTask = task as OutputMetricsTask;
Expand Down
3 changes: 2 additions & 1 deletion tests/core/metrics/calculateSelectiveFileMetrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import type { ProcessedFile } from '../../../src/core/file/fileTypes.js';
import { calculateSelectiveFileMetrics } from '../../../src/core/metrics/calculateSelectiveFileMetrics.js';
import type { FileMetricsTask } from '../../../src/core/metrics/workers/fileMetricsWorker.js';
import fileMetricsWorker from '../../../src/core/metrics/workers/fileMetricsWorker.js';
import type { WorkerOptions } from '../../../src/shared/processConcurrency.js';
import type { RepomixProgressCallback } from '../../../src/shared/types.js';

vi.mock('../../shared/processConcurrency', () => ({
getProcessConcurrency: () => 1,
}));

const mockInitTaskRunner = <T, R>(_numOfTasks: number, _workerPath: string) => {
const mockInitTaskRunner = <T, R>(_options: WorkerOptions) => {
return {
run: async (task: T) => {
return (await fileMetricsWorker(task as FileMetricsTask)) as R;
Expand Down
5 changes: 3 additions & 2 deletions tests/core/security/securityCheck.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { SecurityCheckTask } from '../../../src/core/security/workers/secur
import securityCheckWorker from '../../../src/core/security/workers/securityCheckWorker.js';
import { logger } from '../../../src/shared/logger.js';
import { repomixLogLevels } from '../../../src/shared/logger.js';
import type { WorkerOptions } from '../../../src/shared/processConcurrency.js';

vi.mock('../../../src/shared/logger');
vi.mock('../../../src/shared/processConcurrency', () => ({
Expand Down Expand Up @@ -39,7 +40,7 @@ const mockFiles: RawFile[] = [
},
];

const mockInitTaskRunner = <T, R>(_numOfTasks: number, _workerPath: string) => {
const mockInitTaskRunner = <T, R>(_options: WorkerOptions) => {
return {
run: async (task: T) => {
return (await securityCheckWorker(task as SecurityCheckTask)) as R;
Expand Down Expand Up @@ -78,7 +79,7 @@ describe('runSecurityCheck', () => {

it('should handle worker errors gracefully', async () => {
const mockError = new Error('Worker error');
const mockErrorTaskRunner = () => {
const mockErrorTaskRunner = (_options?: WorkerOptions) => {
return {
run: async () => {
throw mockError;
Expand Down
3 changes: 2 additions & 1 deletion tests/integration-tests/packager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import { copyToClipboardIfEnabled } from '../../src/core/packager/copyToClipboar
import { writeOutputToDisk } from '../../src/core/packager/writeOutputToDisk.js';
import { filterOutUntrustedFiles } from '../../src/core/security/filterOutUntrustedFiles.js';
import { validateFileSafety } from '../../src/core/security/validateFileSafety.js';
import type { WorkerOptions } from '../../src/shared/processConcurrency.js';
import { isWindows } from '../testing/testUtils.js';

const fixturesDir = path.join(__dirname, 'fixtures', 'packager');
const inputsDir = path.join(fixturesDir, 'inputs');
const outputsDir = path.join(fixturesDir, 'outputs');

const mockCollectFileInitTaskRunner = <T, R>(_numOfTasks: number, _workerPath: string) => {
const mockCollectFileInitTaskRunner = <T, R>(_options: WorkerOptions) => {
return {
run: async (task: T) => {
return (await fileCollectWorker(task as FileCollectTask)) as R;
Expand Down
Loading
Loading