Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/common/event-size-monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { ErrorRecord } from '../types/common';
import { EventData } from '../types/extraction';

const MAX_EVENT_SIZE_BYTES = 200_000;
const EVENT_SIZE_THRESHOLD_BYTES = Math.floor(MAX_EVENT_SIZE_BYTES * 0.8); // 160_000 bytes

/**
* Get the JSON serialized size of event data in bytes
*/
export function getEventDataSize(data: EventData | undefined): number {
if (!data) return 0;
return JSON.stringify(data).length;
Copy link

Copilot AI Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON.stringify().length calculates size in UTF-16 code units, not bytes. For accurate byte size calculation (especially for SQS limits), use Buffer.byteLength(JSON.stringify(data), 'utf8') or new TextEncoder().encode(JSON.stringify(data)).length.

Suggested change
return JSON.stringify(data).length;
return Buffer.byteLength(JSON.stringify(data), 'utf8');

Copilot uses AI. Check for mistakes.
}

/**
* Check if event data exceeds the 80% threshold (160KB)
*/
export function shouldTriggerSizeLimit(data: EventData | undefined): boolean {
return getEventDataSize(data) > EVENT_SIZE_THRESHOLD_BYTES;
}

/**
* Truncate error message to max length (default 1000 chars)
*/
export function truncateErrorMessage(
error: ErrorRecord | undefined,
maxLength: number = 1000
): ErrorRecord | undefined {
if (!error) return undefined;

return {
message: error.message.substring(0, maxLength),
Copy link

Copilot AI Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential error if error.message is undefined or null. Add a null check: message: error.message?.substring(0, maxLength) ?? ''

Suggested change
message: error.message.substring(0, maxLength),
message: error.message?.substring(0, maxLength) ?? '',

Copilot uses AI. Check for mistakes.
};
}

/**
* Prune event data by truncating error messages
* Always applied before serialization
*/
export function pruneEventData(
data: EventData | undefined
): EventData | undefined {
if (!data) return data;

return {
...data,
error: truncateErrorMessage(data.error),
};
}

/**
* Log detailed warning when size limit is detected
*/
export function logSizeLimitWarning(
size: number,
triggerType: 'onUpload' | 'onEmit'
): void {
const percentage = (size / MAX_EVENT_SIZE_BYTES) * 100;
const detailsString =
triggerType === 'onUpload'
? 'during data collection. Emitting progress event and stopping further processing.'
: 'during emit. Error messages truncated.';

console.warn(
`[SIZE_LIMIT] Event data size ${size} bytes (${percentage.toFixed(
1
)}% of ${MAX_EVENT_SIZE_BYTES} limit) detected ${detailsString}`
);
}

export { MAX_EVENT_SIZE_BYTES as MAX_EVENT_SIZE, EVENT_SIZE_THRESHOLD_BYTES as SIZE_LIMIT_THRESHOLD };
Copy link

Copilot AI Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Exporting constants with renamed aliases on a separate line from function exports reduces readability. Consider moving these constant exports to the top of the file with the constant declarations or using separate export statements for clarity.

Copilot uses AI. Check for mistakes.
9 changes: 9 additions & 0 deletions src/workers/process-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ export function processTask<ConnectorState>({
})()
);
await task({ adapter });

// If size limit was triggered during task, call onTimeout for cleanup
if (adapter.isTimeout) {
console.log(
'[SIZE_LIMIT] Size limit detected during data collection. Executing onTimeout function for cleanup.'
);
await onTimeout({ adapter });
}

process.exit(0);
}
} catch (error) {
Expand Down
31 changes: 30 additions & 1 deletion src/workers/worker-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import {
STATELESS_EVENT_TYPES,
} from '../common/constants';
import { emit } from '../common/control-protocol';
import {
logSizeLimitWarning,
pruneEventData,
SIZE_LIMIT_THRESHOLD,
} from '../common/event-size-monitor';
import { addReportToLoaderReport, getFilesToLoad } from '../common/helpers';
import { serializeError } from '../logger/logger';
import { Mappers } from '../mappers/mappers';
Expand Down Expand Up @@ -91,6 +96,9 @@ export class WorkerAdapter<ConnectorState> {
private _mappers: Mappers;
private uploader: Uploader;

// Length of the resulting artifact JSON object string.
private currentLength: number = 0;

constructor({
event,
adapterState,
Expand Down Expand Up @@ -149,12 +157,30 @@ export class WorkerAdapter<ConnectorState> {
itemType: repo.itemType,
...(shouldNormalize && { normalize: repo.normalize }),
onUpload: (artifact: Artifact) => {
const newLength = JSON.stringify(artifact).length;
Copy link

Copilot AI Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON.stringify() is called here and the same artifact is likely stringified again later when emitted. Consider caching the stringified result to avoid redundant serialization.

Suggested change
const newLength = JSON.stringify(artifact).length;
// Cache the stringified artifact to avoid redundant serialization
if (!('_stringified' in artifact)) {
(artifact as any)._stringified = JSON.stringify(artifact);
}
const newLength = (artifact as any)._stringified.length;

Copilot uses AI. Check for mistakes.

// We need to store artifacts ids in state for later use when streaming attachments
if (repo.itemType === AIRDROP_DEFAULT_ITEM_TYPES.ATTACHMENTS) {
this.state.toDevRev?.attachmentsMetadata.artifactIds.push(
artifact.id
);
}

this.currentLength += newLength;

// Check for size limit (80% of 200KB = 160KB threshold)
if (
this.currentLength > SIZE_LIMIT_THRESHOLD &&
!this.hasWorkerEmitted
) {
Comment on lines +171 to +175
Copy link

Copilot AI Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states '80% of 200KB = 160KB' but this should be 160,000 bytes. The comment should clarify that SIZE_LIMIT_THRESHOLD is defined elsewhere to avoid confusion about the actual numeric value being compared.

Copilot uses AI. Check for mistakes.
logSizeLimitWarning(this.currentLength, 'onUpload');

// Set timeout flag to trigger onTimeout cleanup after task completes
this.handleTimeout();

// Emit progress event to save state and continue on next iteration
void this.emit(ExtractorEventType.ExtractionDataProgress);
}
},
options: this.options,
});
Expand Down Expand Up @@ -246,11 +272,14 @@ export class WorkerAdapter<ConnectorState> {
}

try {
// Always prune error messages to 1000 chars before emit
Copy link

Copilot AI Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment says 'chars' but should say 'characters' for consistency with the function documentation in event-size-monitor.ts. Also, like Comment 3, this should reference that the 1000 limit is defined in the truncateErrorMessage function.

Suggested change
// Always prune error messages to 1000 chars before emit
// Always prune error messages to 1000 characters before emit (limit defined in truncateErrorMessage)

Copilot uses AI. Check for mistakes.
const prunedData = pruneEventData(data);

await emit({
eventType: newEventType,
event: this.event,
data: {
...data,
...prunedData,
...(ALLOWED_EXTRACTION_EVENT_TYPES.includes(
this.event.payload.event_type
)
Expand Down