From 10bc0375f4f43d2a588e54bee94f88ec3e762077 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C5=A1per=20Zgonec?= Date: Thu, 9 Oct 2025 15:29:33 +0200 Subject: [PATCH 1/4] Implemented a crude SQS size overflow workaround. --- src/workers/worker-adapter.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index 028eb91..ffbec20 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -48,6 +48,8 @@ import { import { Uploader } from '../uploader/uploader'; import { Artifact, SsorAttachment } from '../uploader/uploader.interfaces'; +const MAX_MESSAGE_LENGTH: number = 200_000; + export function createWorkerAdapter({ event, adapterState, @@ -91,6 +93,9 @@ export class WorkerAdapter { private _mappers: Mappers; private uploader: Uploader; + // Length of the resulting artifact JSON object string. + private currentLength: number = 0; + constructor({ event, adapterState, @@ -149,12 +154,20 @@ export class WorkerAdapter { itemType: repo.itemType, ...(shouldNormalize && { normalize: repo.normalize }), onUpload: (artifact: Artifact) => { + let newLength = JSON.stringify(artifact).length; + // We need to store artifacts ids in state for later use when streaming attachments if (repo.itemType === AIRDROP_DEFAULT_ITEM_TYPES.ATTACHMENTS) { this.state.toDevRev?.attachmentsMetadata.artifactIds.push( artifact.id ); } + + if(this.currentLength + newLength > MAX_MESSAGE_LENGTH) { + // TODO: We need to call the adapter's `onTimeout` here and `emit` the Progress event. + // We might have to run the `uploadAllRepos` as well. + this.handleTimeout(); + } }, options: this.options, }); From 5c9be86101a75e3659eb794fc7dab208c6158015 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C5=A1per=20Zgonec?= Date: Mon, 3 Nov 2025 10:28:01 +0100 Subject: [PATCH 2/4] Added handling for the size constraints. When the size reaches 80% of the 200KB size for the message, executes the "onTimeout" function and returns the PROGRESS event. --- src/common/event-size-monitor.ts | 63 ++++++++++++++++++++++++++++++++ src/workers/process-task.ts | 9 +++++ src/workers/worker-adapter.ts | 23 ++++++++++-- 3 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 src/common/event-size-monitor.ts diff --git a/src/common/event-size-monitor.ts b/src/common/event-size-monitor.ts new file mode 100644 index 0000000..63ec1f8 --- /dev/null +++ b/src/common/event-size-monitor.ts @@ -0,0 +1,63 @@ +import { EventData } from '../types/extraction'; +import { ErrorRecord } from '../types/common'; + +const MAX_EVENT_SIZE = 200_000; +const SIZE_LIMIT_THRESHOLD = Math.floor(MAX_EVENT_SIZE * 0.8); // 160_000 bytes + +/** + * Get the JSON serialized size of event data in bytes + */ +export function getEventDataSize(data: EventData | undefined): number { + if (!data) return 0; + return JSON.stringify(data).length; +} + +/** + * Check if event data exceeds the 80% threshold (160KB) + */ +export function shouldTriggerSizeLimit(data: EventData | undefined): boolean { + return getEventDataSize(data) > SIZE_LIMIT_THRESHOLD; +} + +/** + * Truncate error message to max length (default 1000 chars) + */ +export function truncateErrorMessage( + error: ErrorRecord | undefined, + maxLength: number = 1000 +): ErrorRecord | undefined { + if (!error) return undefined; + + return { + message: error.message.substring(0, maxLength) + }; +} + +/** + * Prune event data by truncating error messages + * Always applied before serialization + */ +export function pruneEventData(data: EventData | undefined): EventData | undefined { + if (!data) return data; + + return { + ...data, + error: truncateErrorMessage(data.error), + }; +} + +/** + * Log detailed warning when size limit is detected + */ +export function logSizeLimitWarning(size: number, triggerType: 'onUpload' | 'onEmit'): void { + const percentage = (size / MAX_EVENT_SIZE) * 100; + const detailsString = triggerType === 'onUpload' + ? 'during data collection. Emitting progress event and stopping further processing.' + : 'during emit. Error messages truncated.'; + + console.warn( + `[SIZE_LIMIT] Event data size ${size} bytes (${percentage.toFixed(1)}% of ${MAX_EVENT_SIZE} limit) detected ${detailsString}` + ); +} + +export { MAX_EVENT_SIZE, SIZE_LIMIT_THRESHOLD }; diff --git a/src/workers/process-task.ts b/src/workers/process-task.ts index b7300b0..745d86e 100644 --- a/src/workers/process-task.ts +++ b/src/workers/process-task.ts @@ -58,6 +58,15 @@ export function processTask({ })() ); await task({ adapter }); + + // If size limit was triggered during task, call onTimeout for cleanup + if (adapter.isTimeout) { + console.log( + '[SIZE_LIMIT] Size limit detected during data collection. Executing onTimeout function for cleanup.' + ); + await onTimeout({ adapter }); + } + process.exit(0); } } catch (error) { diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index ffbec20..7a3677b 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -47,6 +47,11 @@ import { } from '../types/workers'; import { Uploader } from '../uploader/uploader'; import { Artifact, SsorAttachment } from '../uploader/uploader.interfaces'; +import { + pruneEventData, + logSizeLimitWarning, + SIZE_LIMIT_THRESHOLD, +} from '../common/event-size-monitor'; const MAX_MESSAGE_LENGTH: number = 200_000; @@ -163,10 +168,17 @@ export class WorkerAdapter { ); } - if(this.currentLength + newLength > MAX_MESSAGE_LENGTH) { - // TODO: We need to call the adapter's `onTimeout` here and `emit` the Progress event. - // We might have to run the `uploadAllRepos` as well. + this.currentLength += newLength; + + // Check for size limit (80% of 200KB = 160KB threshold) + if (this.currentLength > SIZE_LIMIT_THRESHOLD && !this.hasWorkerEmitted) { + logSizeLimitWarning(this.currentLength, 'onUpload'); + + // Set timeout flag to trigger onTimeout cleanup after task completes this.handleTimeout(); + + // Emit progress event to save state and continue on next iteration + void this.emit(ExtractorEventType.ExtractionDataProgress); } }, options: this.options, @@ -259,11 +271,14 @@ export class WorkerAdapter { } try { + // Always prune error messages to 1000 chars before emit + const prunedData = pruneEventData(data); + await emit({ eventType: newEventType, event: this.event, data: { - ...data, + ...prunedData, ...(ALLOWED_EXTRACTION_EVENT_TYPES.includes( this.event.payload.event_type ) From 2a79a1eac39701b786ef17bfda63a12a95b7a819 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C5=A1per=20Zgonec?= Date: Mon, 3 Nov 2025 11:17:30 +0100 Subject: [PATCH 3/4] Ran prettier --- src/common/event-size-monitor.ts | 24 ++++++++++++++++-------- src/workers/worker-adapter.ts | 19 ++++++++++--------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/common/event-size-monitor.ts b/src/common/event-size-monitor.ts index 63ec1f8..9b65e07 100644 --- a/src/common/event-size-monitor.ts +++ b/src/common/event-size-monitor.ts @@ -1,5 +1,5 @@ -import { EventData } from '../types/extraction'; import { ErrorRecord } from '../types/common'; +import { EventData } from '../types/extraction'; const MAX_EVENT_SIZE = 200_000; const SIZE_LIMIT_THRESHOLD = Math.floor(MAX_EVENT_SIZE * 0.8); // 160_000 bytes @@ -29,7 +29,7 @@ export function truncateErrorMessage( if (!error) return undefined; return { - message: error.message.substring(0, maxLength) + message: error.message.substring(0, maxLength), }; } @@ -37,7 +37,9 @@ export function truncateErrorMessage( * Prune event data by truncating error messages * Always applied before serialization */ -export function pruneEventData(data: EventData | undefined): EventData | undefined { +export function pruneEventData( + data: EventData | undefined +): EventData | undefined { if (!data) return data; return { @@ -49,14 +51,20 @@ export function pruneEventData(data: EventData | undefined): EventData | undefin /** * Log detailed warning when size limit is detected */ -export function logSizeLimitWarning(size: number, triggerType: 'onUpload' | 'onEmit'): void { +export function logSizeLimitWarning( + size: number, + triggerType: 'onUpload' | 'onEmit' +): void { const percentage = (size / MAX_EVENT_SIZE) * 100; - const detailsString = triggerType === 'onUpload' - ? 'during data collection. Emitting progress event and stopping further processing.' - : 'during emit. Error messages truncated.'; + const detailsString = + triggerType === 'onUpload' + ? 'during data collection. Emitting progress event and stopping further processing.' + : 'during emit. Error messages truncated.'; console.warn( - `[SIZE_LIMIT] Event data size ${size} bytes (${percentage.toFixed(1)}% of ${MAX_EVENT_SIZE} limit) detected ${detailsString}` + `[SIZE_LIMIT] Event data size ${size} bytes (${percentage.toFixed( + 1 + )}% of ${MAX_EVENT_SIZE} limit) detected ${detailsString}` ); } diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index 7a3677b..a64706a 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -7,6 +7,11 @@ import { STATELESS_EVENT_TYPES, } from '../common/constants'; import { emit } from '../common/control-protocol'; +import { + logSizeLimitWarning, + pruneEventData, + SIZE_LIMIT_THRESHOLD, +} from '../common/event-size-monitor'; import { addReportToLoaderReport, getFilesToLoad } from '../common/helpers'; import { serializeError } from '../logger/logger'; import { Mappers } from '../mappers/mappers'; @@ -47,13 +52,6 @@ import { } from '../types/workers'; import { Uploader } from '../uploader/uploader'; import { Artifact, SsorAttachment } from '../uploader/uploader.interfaces'; -import { - pruneEventData, - logSizeLimitWarning, - SIZE_LIMIT_THRESHOLD, -} from '../common/event-size-monitor'; - -const MAX_MESSAGE_LENGTH: number = 200_000; export function createWorkerAdapter({ event, @@ -159,7 +157,7 @@ export class WorkerAdapter { itemType: repo.itemType, ...(shouldNormalize && { normalize: repo.normalize }), onUpload: (artifact: Artifact) => { - let newLength = JSON.stringify(artifact).length; + const newLength = JSON.stringify(artifact).length; // We need to store artifacts ids in state for later use when streaming attachments if (repo.itemType === AIRDROP_DEFAULT_ITEM_TYPES.ATTACHMENTS) { @@ -171,7 +169,10 @@ export class WorkerAdapter { this.currentLength += newLength; // Check for size limit (80% of 200KB = 160KB threshold) - if (this.currentLength > SIZE_LIMIT_THRESHOLD && !this.hasWorkerEmitted) { + if ( + this.currentLength > SIZE_LIMIT_THRESHOLD && + !this.hasWorkerEmitted + ) { logSizeLimitWarning(this.currentLength, 'onUpload'); // Set timeout flag to trigger onTimeout cleanup after task completes From 5b3ff6bbe0a7a6a6cb78486479f6e6d4ca75905a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C5=A1per=20Zgonec?= Date: Tue, 4 Nov 2025 08:25:56 +0100 Subject: [PATCH 4/4] Added units and united constant namings. --- src/common/event-size-monitor.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/common/event-size-monitor.ts b/src/common/event-size-monitor.ts index 9b65e07..a60a301 100644 --- a/src/common/event-size-monitor.ts +++ b/src/common/event-size-monitor.ts @@ -1,8 +1,8 @@ import { ErrorRecord } from '../types/common'; import { EventData } from '../types/extraction'; -const MAX_EVENT_SIZE = 200_000; -const SIZE_LIMIT_THRESHOLD = Math.floor(MAX_EVENT_SIZE * 0.8); // 160_000 bytes +const MAX_EVENT_SIZE_BYTES = 200_000; +const EVENT_SIZE_THRESHOLD_BYTES = Math.floor(MAX_EVENT_SIZE_BYTES * 0.8); // 160_000 bytes /** * Get the JSON serialized size of event data in bytes @@ -16,7 +16,7 @@ export function getEventDataSize(data: EventData | undefined): number { * Check if event data exceeds the 80% threshold (160KB) */ export function shouldTriggerSizeLimit(data: EventData | undefined): boolean { - return getEventDataSize(data) > SIZE_LIMIT_THRESHOLD; + return getEventDataSize(data) > EVENT_SIZE_THRESHOLD_BYTES; } /** @@ -55,7 +55,7 @@ export function logSizeLimitWarning( size: number, triggerType: 'onUpload' | 'onEmit' ): void { - const percentage = (size / MAX_EVENT_SIZE) * 100; + const percentage = (size / MAX_EVENT_SIZE_BYTES) * 100; const detailsString = triggerType === 'onUpload' ? 'during data collection. Emitting progress event and stopping further processing.' @@ -64,8 +64,8 @@ export function logSizeLimitWarning( console.warn( `[SIZE_LIMIT] Event data size ${size} bytes (${percentage.toFixed( 1 - )}% of ${MAX_EVENT_SIZE} limit) detected ${detailsString}` + )}% of ${MAX_EVENT_SIZE_BYTES} limit) detected ${detailsString}` ); } -export { MAX_EVENT_SIZE, SIZE_LIMIT_THRESHOLD }; +export { MAX_EVENT_SIZE_BYTES as MAX_EVENT_SIZE, EVENT_SIZE_THRESHOLD_BYTES as SIZE_LIMIT_THRESHOLD };