Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions src/core/metrics/TokenCounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import { logger } from '../../shared/logger.js';
export const TOKEN_ENCODINGS = ['o200k_base', 'cl100k_base', 'p50k_base', 'p50k_edit', 'r50k_base'] as const;
export type TokenEncoding = (typeof TOKEN_ENCODINGS)[number];

// BPE rank data type returned by resolveEncodingAsync.
// Matches gpt-tokenizer's RawBytePairRanks: each entry is either a base64 string
// or an array of raw byte values.
export type BpeRanks = readonly (string | readonly number[])[];

interface CountTokensOptions {
disallowedSpecial?: Set<string>;
}
Expand All @@ -19,6 +24,13 @@ const PLAIN_TEXT_OPTIONS: CountTokensOptions = { disallowedSpecial: new Set() };
// Lazy-loaded countTokens functions keyed by encoding
const encodingModules = new Map<string, CountTokensFn>();

const createEncoderFromBpeRanks = (encodingName: TokenEncoding, bpeRanks: BpeRanks): CountTokensFn => {
const encoder = GptEncoding.getEncodingApi(encodingName, () => bpeRanks);
const countFn = encoder.countTokens.bind(encoder) as CountTokensFn;
encodingModules.set(encodingName, countFn);
return countFn;
};

const loadEncoding = async (encodingName: TokenEncoding): Promise<CountTokensFn> => {
const cached = encodingModules.get(encodingName);
if (cached) {
Expand All @@ -30,9 +42,7 @@ const loadEncoding = async (encodingName: TokenEncoding): Promise<CountTokensFn>
// Use resolveEncodingAsync to lazily load BPE rank data, then create a GptEncoding instance.
// resolveEncodingAsync uses static import paths internally, so bundlers (rolldown) can resolve them.
const bpeRanks = await resolveEncodingAsync(encodingName);
const encoder = GptEncoding.getEncodingApi(encodingName, () => bpeRanks);
const countFn = encoder.countTokens.bind(encoder) as CountTokensFn;
encodingModules.set(encodingName, countFn);
const countFn = createEncoderFromBpeRanks(encodingName, bpeRanks);

const endTime = process.hrtime.bigint();
const initTime = Number(endTime - startTime) / 1e6;
Expand All @@ -41,6 +51,14 @@ const loadEncoding = async (encodingName: TokenEncoding): Promise<CountTokensFn>
return countFn;
};

/**
* Pre-load BPE rank data for an encoding. Called on the main thread to load
* once and share with worker threads, avoiding redundant file I/O per worker.
*/
export const loadBpeRanks = async (encodingName: TokenEncoding): Promise<BpeRanks> => {
return resolveEncodingAsync(encodingName);
};

export class TokenCounter {
private countFn: CountTokensFn | null = null;
private readonly encodingName: TokenEncoding;
Expand All @@ -53,6 +71,20 @@ export class TokenCounter {
this.countFn = await loadEncoding(this.encodingName);
}

/**
* Initialize from pre-loaded BPE rank data, skipping the async file I/O.
* Used by worker threads that receive BPE data from the main thread.
*/
initFromBpeRanks(bpeRanks: BpeRanks): void {
const startTime = process.hrtime.bigint();
this.countFn = createEncoderFromBpeRanks(this.encodingName, bpeRanks);
const endTime = process.hrtime.bigint();
const initTime = Number(endTime - startTime) / 1e6;
logger.debug(
`TokenCounter initialization from pre-loaded BPE for ${this.encodingName} took ${initTime.toFixed(2)}ms`,
);
}

public countTokens(content: string, filePath?: string): number {
if (!this.countFn) {
throw new Error('TokenCounter not initialized. Call init() first.');
Expand Down
29 changes: 26 additions & 3 deletions src/core/metrics/calculateMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { calculateGitLogMetrics } from './calculateGitLogMetrics.js';
import { calculateOutputMetrics } from './calculateOutputMetrics.js';
import { calculateSelectiveFileMetrics } from './calculateSelectiveFileMetrics.js';
import type { MetricsTaskRunner } from './metricsWorkerRunner.js';
import type { TokenEncoding } from './TokenCounter.js';
import { loadBpeRanks, type TokenEncoding } from './TokenCounter.js';
import type { MetricsWorkerResult, MetricsWorkerTask } from './workers/calculateMetricsWorker.js';

export interface CalculateMetricsResult {
Expand All @@ -33,17 +33,40 @@ export interface MetricsTaskRunnerWithWarmup {
* gpt-tokenizer initialization in parallel. This allows the expensive module
* loading to overlap with other pipeline stages (security check, file processing,
* output generation).
*
* BPE rank data (~200K entries, ~3.6MB on disk) is pre-loaded once on the main
* thread and sent to each worker as a JSON string (~1.6MB). Workers deserialize
* and build the encoder locally (~73ms) instead of each independently reading
* and parsing the BPE file from disk (~210-330ms). The JSON string serializes
* via structured clone in ~3ms (vs ~26ms for the raw array), making the IPC
* overhead negligible.
*/
export const createMetricsTaskRunner = (numOfTasks: number, encoding: TokenEncoding): MetricsTaskRunnerWithWarmup => {
// Start loading BPE data on the main thread (async I/O, overlaps with pool creation
// and subsequent pipeline stages like searchFiles and collectFiles).
// If pre-loading fails (e.g., missing BPE asset in bundled builds), fall back to
// null so workers load BPE from disk independently (slower but correct).
const bpeRanksJsonPromise = loadBpeRanks(encoding)
.then((bpeRanks) => JSON.stringify(bpeRanks))
.catch(() => null as string | null);

const taskRunner = initTaskRunner<MetricsWorkerTask, MetricsWorkerResult>({
numOfTasks,
workerType: 'calculateMetrics',
runtime: 'worker_threads',
});

const { maxThreads } = getWorkerThreadCount(numOfTasks);
const warmupPromise = Promise.all(
Array.from({ length: maxThreads }, () => taskRunner.run({ content: '', encoding }).catch(() => 0)),

// Once BPE data is loaded, dispatch warmup tasks carrying the pre-serialized data.
// Workers deserialize + build encoder (~73ms) instead of loading from disk (~280ms).
// If bpeRanksJson is null (pre-load failed), workers fall back to disk loading.
const warmupPromise = bpeRanksJsonPromise.then((bpeRanksJson) =>
Promise.all(
Array.from({ length: maxThreads }, () =>
taskRunner.run({ content: '', encoding, ...(bpeRanksJson != null && { bpeRanksJson }) }).catch(() => 0),
),
),
);
Comment on lines +64 to 70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential race condition in how the warmup tasks are queued. createMetricsTaskRunner returns the taskRunner immediately, but the warmup tasks are only queued after the BPE data is loaded and serialized (which is an async operation). If the caller submits real tasks to the runner immediately after calling createMetricsTaskRunner, those tasks will likely be queued before the warmup tasks, causing the workers to perform the expensive disk I/O anyway.

To ensure the optimization is effective, the caller must await the warmupPromise before submitting real tasks, or the initialization logic should be adjusted to ensure warmup tasks are prioritized at the head of the queue.


return { taskRunner, warmupPromise };
Expand Down
16 changes: 15 additions & 1 deletion src/core/metrics/tokenCounterFactory.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { logger } from '../../shared/logger.js';
import { TokenCounter, type TokenEncoding } from './TokenCounter.js';
import { type BpeRanks, TokenCounter, type TokenEncoding } from './TokenCounter.js';

// Worker-level cache for TokenCounter instances by encoding
const tokenCounters = new Map<TokenEncoding, TokenCounter>();
Expand All @@ -18,6 +18,20 @@ export const getTokenCounter = async (encoding: TokenEncoding): Promise<TokenCou
return tokenCounter;
};

/**
* Initialize a TokenCounter from pre-loaded BPE rank data.
* Called by worker threads receiving BPE data from the main thread,
* skipping the expensive per-worker file I/O (~105ms per worker).
*/
export const initTokenCounterFromBpeRanks = (encoding: TokenEncoding, bpeRanks: BpeRanks): void => {
if (tokenCounters.has(encoding)) {
return;
}
const tokenCounter = new TokenCounter(encoding);
tokenCounter.initFromBpeRanks(bpeRanks);
tokenCounters.set(encoding, tokenCounter);
};

/**
* Free all TokenCounter resources and clear the cache.
* No-op for gpt-tokenizer (pure JS), but kept for API compatibility.
Expand Down
19 changes: 18 additions & 1 deletion src/core/metrics/workers/calculateMetricsWorker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { logger, setLogLevelByWorkerData } from '../../../shared/logger.js';
import type { TokenEncoding } from '../TokenCounter.js';
import { freeTokenCounters, getTokenCounter } from '../tokenCounterFactory.js';
import { freeTokenCounters, getTokenCounter, initTokenCounterFromBpeRanks } from '../tokenCounterFactory.js';
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Import the new guard function to check for initialization status.

Suggested change
import { freeTokenCounters, getTokenCounter, initTokenCounterFromBpeRanks } from '../tokenCounterFactory.js';
import { freeTokenCounters, getTokenCounter, initTokenCounterFromBpeRanks, isTokenCounterInitialized } from '../tokenCounterFactory.js';


/**
* Token counting worker for metrics calculation.
Expand All @@ -18,6 +18,10 @@ export interface TokenCountTask {
content: string;
encoding: TokenEncoding;
path?: string;
/** Pre-serialized BPE rank data (JSON string) for fast worker initialization.
* When provided (typically in warmup tasks), the worker skips the expensive
* per-worker BPE file I/O (~105ms) and initializes from the pre-loaded data. */
bpeRanksJson?: string;
}

export interface TokenCountBatchItem {
Expand All @@ -37,6 +41,19 @@ export const countTokens = async (task: TokenCountTask): Promise<number> => {
const processStartAt = process.hrtime.bigint();

try {
// Initialize from pre-loaded BPE data if provided (warmup path).
// This avoids each worker independently loading the ~3.6MB BPE file from disk,
// saving ~105ms per worker by receiving the data via IPC instead.
// If parsing fails, getTokenCounter below falls back to disk loading.
if (task.bpeRanksJson) {
try {
const bpeRanks = JSON.parse(task.bpeRanksJson);
initTokenCounterFromBpeRanks(task.encoding, bpeRanks);
} catch {
// Fall through to getTokenCounter which loads from disk
}
}
Comment on lines +48 to +55
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The worker currently parses the bpeRanksJson string every time it's provided in a task. While it's primarily intended for warmup tasks, adding a guard check here prevents expensive and redundant JSON parsing (of a ~1.6MB string) if multiple tasks with BPE data are received by the same worker before it has finished initializing.

Suggested change
if (task.bpeRanksJson) {
try {
const bpeRanks = JSON.parse(task.bpeRanksJson);
initTokenCounterFromBpeRanks(task.encoding, bpeRanks);
} catch {
// Fall through to getTokenCounter which loads from disk
}
}
if (task.bpeRanksJson && !isTokenCounterInitialized(task.encoding)) {
try {
const bpeRanks = JSON.parse(task.bpeRanksJson);
initTokenCounterFromBpeRanks(task.encoding, bpeRanks);
} catch {
// Fall through to getTokenCounter which loads from disk
}
}


const counter = await getTokenCounter(task.encoding);
const tokenCount = counter.countTokens(task.content, task.path);

Expand Down
9 changes: 7 additions & 2 deletions tests/core/metrics/calculateMetrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ vi.mock('../../../src/core/metrics/TokenCounter.js', () => {
countTokens: vi.fn().mockReturnValue(10),
free: vi.fn(),
})),
loadBpeRanks: vi.fn().mockResolvedValue(['mock-bpe-data']),
};
});
vi.mock('../../../src/core/metrics/aggregateMetrics.js');
Expand Down Expand Up @@ -113,12 +114,16 @@ describe('createMetricsTaskRunner', () => {
await expect(result.warmupPromise).resolves.toBeDefined();
});

it('should fire a warmup task with empty content', async () => {
it('should fire a warmup task with empty content and pre-loaded BPE data', async () => {
const result = createMetricsTaskRunner(50, 'cl100k_base');

await result.warmupPromise;

expect(result.taskRunner.run).toHaveBeenCalledWith({ content: '', encoding: 'cl100k_base' });
expect(result.taskRunner.run).toHaveBeenCalledWith({
content: '',
encoding: 'cl100k_base',
bpeRanksJson: expect.any(String),
});
});

it('should swallow warmup task errors', async () => {
Expand Down
Loading