diff --git a/.eslintignore b/.eslintignore deleted file mode 100644 index f06235c..0000000 --- a/.eslintignore +++ /dev/null @@ -1,2 +0,0 @@ -node_modules -dist diff --git a/eslint.config.mts b/eslint.config.mts index cb30574..c785f01 100644 --- a/eslint.config.mts +++ b/eslint.config.mts @@ -6,7 +6,7 @@ import globals from 'globals'; import tseslint from 'typescript-eslint'; export default defineConfig([ - { ignores: ['dist/**', 'test/**', 'src/deprecated/**'] }, + { ignores: ['dist/**', 'test/**', 'src/deprecated/**', 'coverage'] }, // JavaScript files (CommonJS) - for worker.js and config files { diff --git a/package-lock.json b/package-lock.json index 906e24e..b830d16 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@devrev/ts-adaas", - "version": "1.12.2", + "version": "1.12.3-beta.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@devrev/ts-adaas", - "version": "1.12.2", + "version": "1.12.3-beta.1", "license": "ISC", "dependencies": { "@devrev/typescript-sdk": "^1.1.59", diff --git a/package.json b/package.json index c0bccab..ebd0b61 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@devrev/ts-adaas", - "version": "1.12.2", + "version": "1.12.3-beta.1", "description": "Typescript library containing the ADaaS(AirDrop as a Service) control protocol.", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/common/constants.ts b/src/common/constants.ts index 76665cf..4da3c3f 100644 --- a/src/common/constants.ts +++ b/src/common/constants.ts @@ -2,14 +2,14 @@ import { EventType } from '../types/extraction'; import { getLibraryVersion } from './helpers'; export const ALLOWED_EXTRACTION_EVENT_TYPES = [ - EventType.ExtractionExternalSyncUnitsStart, - EventType.ExtractionMetadataStart, - EventType.ExtractionDataStart, - EventType.ExtractionDataContinue, - EventType.ExtractionDataDelete, - EventType.ExtractionAttachmentsStart, - EventType.ExtractionAttachmentsContinue, - EventType.ExtractionAttachmentsDelete, + EventType.StartExtractingExternalSyncUnits, + EventType.StartExtractingMetadata, + EventType.StartExtractingData, + EventType.ContinueExtractingData, + EventType.StartDeletingExtractorState, + EventType.StartExtractingAttachments, + EventType.ContinueExtractingAttachments, + EventType.StartDeletingExtractorAttachmentsState, ]; export const ALLOWED_LOADING_EVENT_TYPES = [ @@ -25,9 +25,9 @@ export const ALLOWED_EVENT_TYPES = [ ]; export const STATELESS_EXTRACTION_EVENT_TYPES = [ - EventType.ExtractionExternalSyncUnitsStart, - EventType.ExtractionDataDelete, - EventType.ExtractionAttachmentsDelete, + EventType.StartExtractingExternalSyncUnits, + EventType.StartDeletingExtractorState, + EventType.StartDeletingExtractorAttachmentsState, ]; export const STATELESS_LOADING_EVENT_TYPES = [ diff --git a/src/common/control-protocol.ts b/src/common/control-protocol.ts index 383e785..c372c91 100644 --- a/src/common/control-protocol.ts +++ b/src/common/control-protocol.ts @@ -9,6 +9,7 @@ import { } from '../types/extraction'; import { LoaderEventType } from '../types/loading'; import { LIBRARY_VERSION } from './constants'; +import { translateOutgoingEventType } from './event-type-translation'; export interface EmitInterface { event: AirdropEvent; @@ -21,8 +22,12 @@ export const emit = async ({ eventType, data, }: EmitInterface): Promise => { + // Translate outgoing event type to ensure we always send new event types + // TODO: Remove when the old types are completely phased out + const translatedEventType = translateOutgoingEventType(eventType); + const newEvent: ExtractorEvent | LoaderEvent = { - event_type: eventType, + event_type: translatedEventType, event_context: event.payload.event_context, event_data: { ...data, diff --git a/src/common/event-type-translation.ts b/src/common/event-type-translation.ts new file mode 100644 index 0000000..7d960b5 --- /dev/null +++ b/src/common/event-type-translation.ts @@ -0,0 +1,163 @@ +import { EventType, ExtractorEventType } from '../types/extraction'; +import { LoaderEventType } from '../types/loading'; + +/** + * Maps old incoming event type strings to new EventType enum values. + * This ensures backwards compatibility when the platform sends old event types. + * @param eventTypeString The raw event type string from the platform + * @returns The translated EventType enum value + */ +export function translateIncomingEventType(eventTypeString: string): EventType { + // Create a reverse mapping from OLD string values to NEW enum member names + const eventTypeMap: Record = { + // Old extraction event types from platform -> New enum members + [EventType.ExtractionExternalSyncUnitsStart]: + EventType.StartExtractingExternalSyncUnits, + [EventType.ExtractionMetadataStart]: EventType.StartExtractingMetadata, + [EventType.ExtractionDataStart]: EventType.StartExtractingData, + [EventType.ExtractionDataContinue]: EventType.ContinueExtractingData, + [EventType.ExtractionDataDelete]: EventType.StartDeletingExtractorState, + [EventType.ExtractionAttachmentsStart]: + EventType.StartExtractingAttachments, + [EventType.ExtractionAttachmentsContinue]: + EventType.ContinueExtractingAttachments, + [EventType.ExtractionAttachmentsDelete]: + EventType.StartDeletingExtractorAttachmentsState, + + // New extraction event types (already correct, map to new enum members) + [EventType.StartExtractingExternalSyncUnits]: + EventType.StartExtractingExternalSyncUnits, + [EventType.StartExtractingMetadata]: EventType.StartExtractingMetadata, + [EventType.StartExtractingData]: EventType.StartExtractingData, + [EventType.ContinueExtractingData]: EventType.ContinueExtractingData, + [EventType.StartDeletingExtractorState]: + EventType.StartDeletingExtractorState, + [EventType.StartExtractingAttachments]: + EventType.StartExtractingAttachments, + [EventType.ContinueExtractingAttachments]: + EventType.ContinueExtractingAttachments, + [EventType.StartDeletingExtractorAttachmentsState]: + EventType.StartDeletingExtractorAttachmentsState, + + // Loading events + [EventType.StartLoadingData]: EventType.StartLoadingData, + [EventType.ContinueLoadingData]: EventType.ContinueLoadingData, + [EventType.StartLoadingAttachments]: EventType.StartLoadingAttachments, + [EventType.ContinueLoadingAttachments]: + EventType.ContinueLoadingAttachments, + [EventType.StartDeletingLoaderState]: EventType.StartDeletingLoaderState, + [EventType.StartDeletingLoaderAttachmentState]: + EventType.StartDeletingLoaderAttachmentState, + + // Unknown + [EventType.UnknownEventType]: EventType.UnknownEventType, + }; + + const translated = eventTypeMap[eventTypeString]; + if (!translated) { + console.warn( + `Unknown event type received: ${eventTypeString}. This may indicate a new event type or a typo.` + ); + // Return the original string cast as EventType as a fallback + return eventTypeString as EventType; + } + + return translated; +} + +/** + * Translates ExtractorEventType enum values by converting old enum members to new ones. + * Old enum members are deprecated and should be replaced with new ones. + */ +export function translateExtractorEventType( + eventType: ExtractorEventType +): ExtractorEventType { + // Map old enum members to new enum members + const stringValue = eventType as string; + + const mapping: Record = { + // Old string values -> New enum members + [ExtractorEventType.ExtractionExternalSyncUnitsDone]: + ExtractorEventType.ExternalSyncUnitExtractionDone, + [ExtractorEventType.ExtractionExternalSyncUnitsError]: + ExtractorEventType.ExternalSyncUnitExtractionError, + [ExtractorEventType.ExtractionMetadataDone]: + ExtractorEventType.MetadataExtractionDone, + [ExtractorEventType.ExtractionMetadataError]: + ExtractorEventType.MetadataExtractionError, + [ExtractorEventType.ExtractionDataProgress]: + ExtractorEventType.DataExtractionProgress, + [ExtractorEventType.ExtractionDataDelay]: + ExtractorEventType.DataExtractionDelayed, + [ExtractorEventType.ExtractionDataDone]: + ExtractorEventType.DataExtractionDone, + [ExtractorEventType.ExtractionDataError]: + ExtractorEventType.DataExtractionError, + [ExtractorEventType.ExtractionDataDeleteDone]: + ExtractorEventType.ExtractorStateDeletionDone, + [ExtractorEventType.ExtractionDataDeleteError]: + ExtractorEventType.ExtractorStateDeletionError, + [ExtractorEventType.ExtractionAttachmentsProgress]: + ExtractorEventType.AttachmentExtractionProgress, + [ExtractorEventType.ExtractionAttachmentsDelay]: + ExtractorEventType.AttachmentExtractionDelayed, + [ExtractorEventType.ExtractionAttachmentsDone]: + ExtractorEventType.AttachmentExtractionDone, + [ExtractorEventType.ExtractionAttachmentsError]: + ExtractorEventType.AttachmentExtractionError, + [ExtractorEventType.ExtractionAttachmentsDeleteDone]: + ExtractorEventType.ExtractorAttachmentsStateDeletionDone, + [ExtractorEventType.ExtractionAttachmentsDeleteError]: + ExtractorEventType.ExtractorAttachmentsStateDeletionError, + }; + + // If there's a mapping, use it; otherwise return original (already new) + return mapping[stringValue] ?? eventType; +} + +/** + * Translates LoaderEventType enum values by converting old enum members to new ones. + * Old enum members are deprecated and should be replaced with new ones. + */ +export function translateLoaderEventType( + eventType: LoaderEventType +): LoaderEventType { + // Map old enum members to new enum members + const stringValue = eventType as string; + + const mapping: Record = { + // Old string values -> New enum members + [LoaderEventType.DataLoadingDelay]: LoaderEventType.DataLoadingDelayed, + [LoaderEventType.AttachmentLoadingProgress]: + LoaderEventType.AttachmentsLoadingProgress, + [LoaderEventType.AttachmentLoadingDelayed]: + LoaderEventType.AttachmentsLoadingDelayed, + [LoaderEventType.AttachmentLoadingDone]: + LoaderEventType.AttachmentsLoadingDone, + [LoaderEventType.AttachmentLoadingError]: + LoaderEventType.AttachmentsLoadingError, + }; + + // If there's a mapping, use it; otherwise return original (already new) + return mapping[stringValue] ?? eventType; +} + +/** + * Translates any outgoing event type (Extractor or Loader) to ensure new event types are used. + */ +export function translateOutgoingEventType( + eventType: ExtractorEventType | LoaderEventType +): ExtractorEventType | LoaderEventType { + // Check if it's an ExtractorEventType by checking if the value exists in ExtractorEventType + if ( + Object.values(ExtractorEventType).includes(eventType as ExtractorEventType) + ) { + return translateExtractorEventType(eventType as ExtractorEventType); + } + // Otherwise treat as LoaderEventType + if (Object.values(LoaderEventType).includes(eventType as LoaderEventType)) { + return translateLoaderEventType(eventType as LoaderEventType); + } + // If neither, return as-is + return eventType; +} diff --git a/src/common/helpers.ts b/src/common/helpers.ts index f4aa775..81caa2a 100644 --- a/src/common/helpers.ts +++ b/src/common/helpers.ts @@ -23,55 +23,73 @@ export function getTimeoutErrorEventType(eventType: EventType): { eventType: ExtractorEventType | LoaderEventType; } { switch (eventType) { + // Metadata extraction (handles both old and new enum members) + case EventType.StartExtractingMetadata: case EventType.ExtractionMetadataStart: return { - eventType: ExtractorEventType.ExtractionMetadataError, + eventType: ExtractorEventType.MetadataExtractionError, }; + // Data extraction (handles both old and new enum members) + case EventType.StartExtractingData: + case EventType.ContinueExtractingData: case EventType.ExtractionDataStart: case EventType.ExtractionDataContinue: return { - eventType: ExtractorEventType.ExtractionDataError, + eventType: ExtractorEventType.DataExtractionError, }; + // Data deletion (handles both old and new enum members) + case EventType.StartDeletingExtractorState: case EventType.ExtractionDataDelete: return { - eventType: ExtractorEventType.ExtractionDataDeleteError, + eventType: ExtractorEventType.ExtractorStateDeletionError, }; + // Attachments extraction (handles both old and new enum members) + case EventType.StartExtractingAttachments: + case EventType.ContinueExtractingAttachments: case EventType.ExtractionAttachmentsStart: case EventType.ExtractionAttachmentsContinue: return { - eventType: ExtractorEventType.ExtractionAttachmentsError, + eventType: ExtractorEventType.AttachmentExtractionError, }; + // Attachments deletion (handles both old and new enum members) + case EventType.StartDeletingExtractorAttachmentsState: case EventType.ExtractionAttachmentsDelete: return { - eventType: ExtractorEventType.ExtractionAttachmentsDeleteError, + eventType: ExtractorEventType.ExtractorAttachmentsStateDeletionError, }; + // External sync units (handles both old and new enum members) + case EventType.StartExtractingExternalSyncUnits: case EventType.ExtractionExternalSyncUnitsStart: return { - eventType: ExtractorEventType.ExtractionExternalSyncUnitsError, + eventType: ExtractorEventType.ExternalSyncUnitExtractionError, }; + // Loading data case EventType.StartLoadingData: case EventType.ContinueLoadingData: return { eventType: LoaderEventType.DataLoadingError, }; + // Deleting loader state case EventType.StartDeletingLoaderState: return { eventType: LoaderEventType.LoaderStateDeletionError, }; + // Loading attachments case EventType.StartLoadingAttachments: case EventType.ContinueLoadingAttachments: return { - eventType: LoaderEventType.AttachmentLoadingError, + eventType: LoaderEventType.AttachmentsLoadingError, }; + // Deleting loader attachment state case EventType.StartDeletingLoaderAttachmentState: return { eventType: LoaderEventType.LoaderAttachmentStateDeletionError, diff --git a/src/deprecated/adapter/index.ts b/src/deprecated/adapter/index.ts index 4360bcd..83e7dba 100644 --- a/src/deprecated/adapter/index.ts +++ b/src/deprecated/adapter/index.ts @@ -14,6 +14,7 @@ import { STATELESS_EVENT_TYPES } from '../../common/constants'; import { getTimeoutExtractorEventType } from '../common/helpers'; // import { Logger } from '../../logger/logger'; import { State, createAdapterState } from '../../state/state'; +import { translateIncomingEventType } from '../../common/event-type-translation'; /** * Adapter class is used to interact with Airdrop platform. The class provides @@ -44,6 +45,8 @@ export async function createAdapter( initialState: ConnectorState, isLocalDevelopment: boolean = false ) { + event.payload.event_type = translateIncomingEventType(event.payload.event_type); + const newInitialState = structuredClone(initialState); const adapterState: State = await createAdapterState({ event, diff --git a/src/state/state.test.ts b/src/state/state.test.ts index 122d985..b5dc301 100644 --- a/src/state/state.test.ts +++ b/src/state/state.test.ts @@ -126,7 +126,7 @@ describe(State.name, () => { it.each( STATEFUL_EVENT_TYPES.filter( - (eventType) => eventType !== EventType.ExtractionDataStart + (eventType) => eventType !== EventType.StartExtractingData ) )( 'should call post state with full adapter state if fetching returns 404 for event type %s', @@ -168,13 +168,13 @@ describe(State.name, () => { } ); - it(EventType.ExtractionDataStart, async () => { + it(EventType.StartExtractingData, async () => { // Arrange const initialState = { test: 'test', }; const event = createEvent({ - eventType: EventType.ExtractionDataStart, + eventType: EventType.StartExtractingData, contextOverrides: { snap_in_version_id: '', }, diff --git a/src/state/state.ts b/src/state/state.ts index 09b6ada..9f75c5f 100644 --- a/src/state/state.ts +++ b/src/state/state.ts @@ -68,9 +68,9 @@ export async function createAdapterState({ } } - // Set lastSyncStarted if the event type is ExtractionDataStart + // Set lastSyncStarted if the event type is StartExtractingData if ( - event.payload.event_type === EventType.ExtractionDataStart && + event.payload.event_type === EventType.StartExtractingData && !as.state.lastSyncStarted ) { as.state.lastSyncStarted = new Date().toISOString(); diff --git a/src/tests/timeout-handling/timeout-1.test.ts b/src/tests/timeout-handling/timeout-1.test.ts index 062625c..29e2a18 100644 --- a/src/tests/timeout-handling/timeout-1.test.ts +++ b/src/tests/timeout-handling/timeout-1.test.ts @@ -1,4 +1,4 @@ -import { EventType } from '../../types/extraction'; +import { EventType, ExtractorEventType } from '../../types/extraction'; import { MockServer } from '../mock-server'; import { createEvent } from '../test-helpers'; import run from './extraction'; @@ -43,6 +43,6 @@ describe('timeout-1 extraction', () => { // Expect last request to be emission of done event expect(lastRequest.url).toContain('airdrop.external-extractor.message'); expect(lastRequest.method).toBe('POST'); - expect(lastRequest.body.event_type).toBe('EXTRACTION_DATA_DONE'); + expect(lastRequest.body.event_type).toBe(ExtractorEventType.DataExtractionDone); }); }); diff --git a/src/tests/timeout-handling/timeout-1.ts b/src/tests/timeout-handling/timeout-1.ts index 465677b..119ebdf 100644 --- a/src/tests/timeout-handling/timeout-1.ts +++ b/src/tests/timeout-handling/timeout-1.ts @@ -6,9 +6,9 @@ processTask({ console.log('timeout-1 iteration', i); } - await adapter.emit(ExtractorEventType.ExtractionDataDone); + await adapter.emit(ExtractorEventType.DataExtractionDone); }, onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionDataProgress); + await adapter.emit(ExtractorEventType.DataExtractionProgress); }, }); diff --git a/src/tests/timeout-handling/timeout-2.test.ts b/src/tests/timeout-handling/timeout-2.test.ts index 1eab2e8..b1ed7f6 100644 --- a/src/tests/timeout-handling/timeout-2.test.ts +++ b/src/tests/timeout-handling/timeout-2.test.ts @@ -1,4 +1,4 @@ -import { EventType } from '../../types/extraction'; +import { EventType, ExtractorEventType } from '../../types/extraction'; import { MockServer } from '../mock-server'; import { createEvent } from '../test-helpers'; import run from './extraction'; @@ -45,6 +45,6 @@ describe('timeout-2 extraction', () => { // Expect last request to be emission of progress event expect(lastRequest.url).toContain('airdrop.external-extractor.message'); expect(lastRequest.method).toBe('POST'); - expect(lastRequest.body.event_type).toBe('EXTRACTION_DATA_PROGRESS'); + expect(lastRequest.body.event_type).toBe(ExtractorEventType.DataExtractionProgress); }); }); diff --git a/src/tests/timeout-handling/timeout-2.ts b/src/tests/timeout-handling/timeout-2.ts index 46b3e89..602b2d3 100644 --- a/src/tests/timeout-handling/timeout-2.ts +++ b/src/tests/timeout-handling/timeout-2.ts @@ -38,13 +38,13 @@ processTask({ } console.log('All network requests completed successfully'); - await adapter.emit(ExtractorEventType.ExtractionDataDone); + await adapter.emit(ExtractorEventType.DataExtractionDone); } catch (error) { console.error('Network request failed:', error); - await adapter.emit(ExtractorEventType.ExtractionDataDone); + await adapter.emit(ExtractorEventType.DataExtractionDone); } }, onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionDataProgress); + await adapter.emit(ExtractorEventType.DataExtractionProgress); }, }); diff --git a/src/tests/timeout-handling/timeout-3a.test.ts b/src/tests/timeout-handling/timeout-3a.test.ts index a40fcd6..06d897f 100644 --- a/src/tests/timeout-handling/timeout-3a.test.ts +++ b/src/tests/timeout-handling/timeout-3a.test.ts @@ -1,4 +1,4 @@ -import { EventType } from '../../types/extraction'; +import { EventType, ExtractorEventType } from '../../types/extraction'; import { MockServer } from '../mock-server'; import { createEvent } from '../test-helpers'; import run from './extraction'; @@ -45,6 +45,6 @@ describe('timeout-3a extraction', () => { // Expect last request to be emission of error event since we force-kill the worker expect(lastRequest.url).toContain('airdrop.external-extractor.message'); expect(lastRequest.method).toBe('POST'); - expect(lastRequest.body.event_type).toBe('EXTRACTION_DATA_ERROR'); + expect(lastRequest.body.event_type).toBe(ExtractorEventType.DataExtractionError); }); }); diff --git a/src/tests/timeout-handling/timeout-3a.ts b/src/tests/timeout-handling/timeout-3a.ts index 63a1c47..cd45ae5 100644 --- a/src/tests/timeout-handling/timeout-3a.ts +++ b/src/tests/timeout-handling/timeout-3a.ts @@ -32,9 +32,9 @@ processTask({ } console.log(`Final computation result: ${result}`); - await adapter.emit(ExtractorEventType.ExtractionDataDone); + await adapter.emit(ExtractorEventType.DataExtractionDone); }, onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionDataProgress); + await adapter.emit(ExtractorEventType.DataExtractionProgress); }, }); diff --git a/src/tests/timeout-handling/timeout-3b.test.ts b/src/tests/timeout-handling/timeout-3b.test.ts index 152984c..b533b83 100644 --- a/src/tests/timeout-handling/timeout-3b.test.ts +++ b/src/tests/timeout-handling/timeout-3b.test.ts @@ -1,4 +1,4 @@ -import { EventType } from '../../types/extraction'; +import { EventType, ExtractorEventType } from '../../types/extraction'; import { MockServer } from '../mock-server'; import { createEvent } from '../test-helpers'; import run from './extraction'; @@ -45,6 +45,6 @@ describe('timeout-3b extraction', () => { // Expect last request to be emission of progress event expect(lastRequest.url).toContain('airdrop.external-extractor.message'); expect(lastRequest.method).toBe('POST'); - expect(lastRequest.body.event_type).toBe('EXTRACTION_DATA_PROGRESS'); + expect(lastRequest.body.event_type).toBe(ExtractorEventType.DataExtractionProgress); }); }); diff --git a/src/tests/timeout-handling/timeout-3b.ts b/src/tests/timeout-handling/timeout-3b.ts index 8b4ed39..84eaf41 100644 --- a/src/tests/timeout-handling/timeout-3b.ts +++ b/src/tests/timeout-handling/timeout-3b.ts @@ -33,9 +33,9 @@ processTask({ } console.log(`Final computation result: ${result}`); - await adapter.emit(ExtractorEventType.ExtractionDataDone); + await adapter.emit(ExtractorEventType.DataExtractionDone); }, onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionDataProgress); + await adapter.emit(ExtractorEventType.DataExtractionProgress); }, }); diff --git a/src/types/extraction.ts b/src/types/extraction.ts index df5ae82..4196030 100644 --- a/src/types/extraction.ts +++ b/src/types/extraction.ts @@ -14,14 +14,38 @@ import { DonV2, LoaderReport, RateLimited } from './loading'; * The external extractor can use these events to know what to do next in the extraction process. */ export enum EventType { - // Extraction + // Extraction - Old member names with OLD values (deprecated, kept for backwards compatibility) + /** + * @deprecated Use StartExtractingExternalSyncUnits instead + */ ExtractionExternalSyncUnitsStart = 'EXTRACTION_EXTERNAL_SYNC_UNITS_START', + /** + * @deprecated Use StartExtractingMetadata instead + */ ExtractionMetadataStart = 'EXTRACTION_METADATA_START', + /** + * @deprecated Use StartExtractingData instead + */ ExtractionDataStart = 'EXTRACTION_DATA_START', + /** + * @deprecated Use ContinueExtractingData instead + */ ExtractionDataContinue = 'EXTRACTION_DATA_CONTINUE', + /** + * @deprecated Use StartDeletingExtractorState instead + */ ExtractionDataDelete = 'EXTRACTION_DATA_DELETE', + /** + * @deprecated Use StartExtractingAttachments instead + */ ExtractionAttachmentsStart = 'EXTRACTION_ATTACHMENTS_START', + /** + * @deprecated Use ContinueExtractingAttachments instead + */ ExtractionAttachmentsContinue = 'EXTRACTION_ATTACHMENTS_CONTINUE', + /** + * @deprecated Use StartDeletingExtractorAttachmentsState instead + */ ExtractionAttachmentsDelete = 'EXTRACTION_ATTACHMENTS_DELETE', // Loading @@ -31,6 +55,19 @@ export enum EventType { ContinueLoadingAttachments = 'CONTINUE_LOADING_ATTACHMENTS', StartDeletingLoaderState = 'START_DELETING_LOADER_STATE', StartDeletingLoaderAttachmentState = 'START_DELETING_LOADER_ATTACHMENT_STATE', + + // Unknown + UnknownEventType = 'UNKNOWN_EVENT_TYPE', + + // Extraction - New member names with NEW values (preferred) + StartExtractingExternalSyncUnits = 'START_EXTRACTING_EXTERNAL_SYNC_UNITS', + StartExtractingMetadata = 'START_EXTRACTING_METADATA', + StartExtractingData = 'START_EXTRACTING_DATA', + ContinueExtractingData = 'CONTINUE_EXTRACTING_DATA', + StartDeletingExtractorState = 'START_DELETING_EXTRACTOR_STATE', + StartExtractingAttachments = 'START_EXTRACTING_ATTACHMENTS', + ContinueExtractingAttachments = 'CONTINUE_EXTRACTING_ATTACHMENTS', + StartDeletingExtractorAttachmentsState = 'START_DELETING_EXTRACTOR_ATTACHMENTS_STATE', } /** @@ -38,26 +75,92 @@ export enum EventType { * The external extractor can use these events to inform ADaaS about the progress of the extraction process. */ export enum ExtractorEventType { - // Extraction + // Extraction - Old member names with OLD values (deprecated, kept for backwards compatibility) + /** + * @deprecated Use ExternalSyncUnitExtractionDone instead + */ ExtractionExternalSyncUnitsDone = 'EXTRACTION_EXTERNAL_SYNC_UNITS_DONE', + /** + * @deprecated Use ExternalSyncUnitExtractionError instead + */ ExtractionExternalSyncUnitsError = 'EXTRACTION_EXTERNAL_SYNC_UNITS_ERROR', + /** + * @deprecated Use MetadataExtractionDone instead + */ ExtractionMetadataDone = 'EXTRACTION_METADATA_DONE', + /** + * @deprecated Use MetadataExtractionError instead + */ ExtractionMetadataError = 'EXTRACTION_METADATA_ERROR', + /** + * @deprecated Use DataExtractionProgress instead + */ ExtractionDataProgress = 'EXTRACTION_DATA_PROGRESS', + /** + * @deprecated Use DataExtractionDelayed instead + */ ExtractionDataDelay = 'EXTRACTION_DATA_DELAY', + /** + * @deprecated Use DataExtractionDone instead + */ ExtractionDataDone = 'EXTRACTION_DATA_DONE', + /** + * @deprecated Use DataExtractionError instead + */ ExtractionDataError = 'EXTRACTION_DATA_ERROR', + /** + * @deprecated Use ExtractorStateDeletionDone instead + */ ExtractionDataDeleteDone = 'EXTRACTION_DATA_DELETE_DONE', + /** + * @deprecated Use ExtractorStateDeletionError instead + */ ExtractionDataDeleteError = 'EXTRACTION_DATA_DELETE_ERROR', + /** + * @deprecated Use AttachmentExtractionProgress instead + */ ExtractionAttachmentsProgress = 'EXTRACTION_ATTACHMENTS_PROGRESS', + /** + * @deprecated Use AttachmentExtractionDelayed instead + */ ExtractionAttachmentsDelay = 'EXTRACTION_ATTACHMENTS_DELAY', + /** + * @deprecated Use AttachmentExtractionDone instead + */ ExtractionAttachmentsDone = 'EXTRACTION_ATTACHMENTS_DONE', + /** + * @deprecated Use AttachmentExtractionError instead + */ ExtractionAttachmentsError = 'EXTRACTION_ATTACHMENTS_ERROR', + /** + * @deprecated Use ExtractorAttachmentsStateDeletionDone instead + */ ExtractionAttachmentsDeleteDone = 'EXTRACTION_ATTACHMENTS_DELETE_DONE', + /** + * @deprecated Use ExtractorAttachmentsStateDeletionError instead + */ ExtractionAttachmentsDeleteError = 'EXTRACTION_ATTACHMENTS_DELETE_ERROR', // Unknown UnknownEventType = 'UNKNOWN_EVENT_TYPE', + + // Extraction - New member names with NEW values (preferred) + ExternalSyncUnitExtractionDone = 'EXTERNAL_SYNC_UNIT_EXTRACTION_DONE', + ExternalSyncUnitExtractionError = 'EXTERNAL_SYNC_UNIT_EXTRACTION_ERROR', + MetadataExtractionDone = 'METADATA_EXTRACTION_DONE', + MetadataExtractionError = 'METADATA_EXTRACTION_ERROR', + DataExtractionProgress = 'DATA_EXTRACTION_PROGRESS', + DataExtractionDelayed = 'DATA_EXTRACTION_DELAYED', + DataExtractionDone = 'DATA_EXTRACTION_DONE', + DataExtractionError = 'DATA_EXTRACTION_ERROR', + ExtractorStateDeletionDone = 'EXTRACTOR_STATE_DELETION_DONE', + ExtractorStateDeletionError = 'EXTRACTOR_STATE_DELETION_ERROR', + AttachmentExtractionProgress = 'ATTACHMENT_EXTRACTION_PROGRESS', + AttachmentExtractionDelayed = 'ATTACHMENT_EXTRACTION_DELAYED', + AttachmentExtractionDone = 'ATTACHMENT_EXTRACTION_DONE', + AttachmentExtractionError = 'ATTACHMENT_EXTRACTION_ERROR', + ExtractorAttachmentsStateDeletionDone = 'EXTRACTOR_ATTACHMENTS_STATE_DELETION_DONE', + ExtractorAttachmentsStateDeletionError = 'EXTRACTOR_ATTACHMENTS_STATE_DELETION_ERROR', } /** diff --git a/src/types/loading.ts b/src/types/loading.ts index 6031f0d..9e5e26c 100644 --- a/src/types/loading.ts +++ b/src/types/loading.ts @@ -135,18 +135,42 @@ export type SyncMapperRecord = { input_file?: string; }; +/* eslint-disable @typescript-eslint/no-duplicate-enum-values */ export enum LoaderEventType { + // Old member names with OLD values (deprecated, but kept for backwards compatibility) DataLoadingProgress = 'DATA_LOADING_PROGRESS', + /** + * @deprecated This was a typo. Use DataLoadingDelayed for the corrected spelling + */ DataLoadingDelay = 'DATA_LOADING_DELAYED', DataLoadingDone = 'DATA_LOADING_DONE', DataLoadingError = 'DATA_LOADING_ERROR', + /** + * @deprecated Use AttachmentsLoadingProgress instead (note: singular changed to plural) + */ AttachmentLoadingProgress = 'ATTACHMENT_LOADING_PROGRESS', + /** + * @deprecated Use AttachmentsLoadingDelayed instead (note: singular changed to plural) + */ AttachmentLoadingDelayed = 'ATTACHMENT_LOADING_DELAYED', + /** + * @deprecated Use AttachmentsLoadingDone instead (note: singular changed to plural) + */ AttachmentLoadingDone = 'ATTACHMENT_LOADING_DONE', + /** + * @deprecated Use AttachmentsLoadingError instead (note: singular changed to plural) + */ AttachmentLoadingError = 'ATTACHMENT_LOADING_ERROR', LoaderStateDeletionDone = 'LOADER_STATE_DELETION_DONE', LoaderStateDeletionError = 'LOADER_STATE_DELETION_ERROR', LoaderAttachmentStateDeletionDone = 'LOADER_ATTACHMENT_STATE_DELETION_DONE', LoaderAttachmentStateDeletionError = 'LOADER_ATTACHMENT_STATE_DELETION_ERROR', UnknownEventType = 'UNKNOWN_EVENT_TYPE', + + // New member names with NEW values (preferred) + DataLoadingDelayed = 'DATA_LOADING_DELAYED', + AttachmentsLoadingProgress = 'ATTACHMENTS_LOADING_PROGRESS', + AttachmentsLoadingDelayed = 'ATTACHMENTS_LOADING_DELAYED', + AttachmentsLoadingDone = 'ATTACHMENTS_LOADING_DONE', + AttachmentsLoadingError = 'ATTACHMENTS_LOADING_ERROR', } diff --git a/src/types/workers.ts b/src/types/workers.ts index 51e2780..f74ab84 100644 --- a/src/types/workers.ts +++ b/src/types/workers.ts @@ -3,7 +3,7 @@ import { Worker } from 'worker_threads'; import { State } from '../state/state'; import { WorkerAdapter } from '../workers/worker-adapter'; -import { AirdropEvent, ExtractorEventType } from './extraction'; +import { AirdropEvent, EventType, ExtractorEventType } from './extraction'; import { LoaderEventType } from './loading'; @@ -30,11 +30,15 @@ export interface WorkerAdapterInterface { * @param {boolean=} isLocalDevelopment - A flag to indicate if the adapter is being used in local development * @param {number=} timeout - The timeout for the worker thread * @param {number=} batchSize - Maximum number of extracted items in a batch + * @param {string=} baseWorkerPath - The base path for the worker files, usually `__dirname` + * @param {Record=} workerPathOverrides - A map of event types to custom worker paths to override default worker paths */ export interface WorkerAdapterOptions { isLocalDevelopment?: boolean; timeout?: number; batchSize?: number; + baseWorkerPath?: string; + workerPathOverrides?: Partial>; } /** @@ -152,5 +156,5 @@ export interface WorkerData { */ export interface GetWorkerPathInterface { event: AirdropEvent; - connectorWorkerPath?: string | null; + workerBasePath?: string | null; } diff --git a/src/workers/default-workers/attachments-deletion.ts b/src/workers/default-workers/attachments-deletion.ts deleted file mode 100644 index c741586..0000000 --- a/src/workers/default-workers/attachments-deletion.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { ExtractorEventType, processTask } from '../../index'; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsDeleteDone); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsDeleteError, { - error: { message: 'Failed to delete attachments. Lambda timeout.' }, - }); - }, -}); diff --git a/src/workers/default-workers/attachments-extraction.ts b/src/workers/default-workers/attachments-extraction.ts deleted file mode 100644 index bbac524..0000000 --- a/src/workers/default-workers/attachments-extraction.ts +++ /dev/null @@ -1,102 +0,0 @@ -import axios, { AxiosResponse } from 'axios'; -import { MAX_DEVREV_ARTIFACT_SIZE } from '../../common/constants'; -import { ExtractorEventType, processTask } from '../../index'; -import { - ExternalSystemAttachmentStreamingParams, - ExternalSystemAttachmentStreamingResponse, -} from '../../types/extraction'; - -const getAttachmentStream = async ({ - item, -}: ExternalSystemAttachmentStreamingParams): Promise => { - const { id, url } = item; - let fileStreamResponse: AxiosResponse | undefined; - - try { - // Get the stream response directly - fileStreamResponse = await axios.get(url, { - responseType: 'stream', - headers: { - 'Accept-Encoding': 'identity', - }, - }); - - // Check content-length from the stream response headers - const contentLength = fileStreamResponse?.headers['content-length']; - if (contentLength && parseInt(contentLength) > MAX_DEVREV_ARTIFACT_SIZE) { - console.warn( - `Attachment ${id} size (${contentLength} bytes) exceeds maximum limit of ${MAX_DEVREV_ARTIFACT_SIZE} bytes. Skipping download.` - ); - - // Destroy the stream since we won't use it - if (fileStreamResponse != null) { - destroyHttpStream(fileStreamResponse); - } - - return { - error: { - message: `File size exceeds maximum limit of ${MAX_DEVREV_ARTIFACT_SIZE} bytes.`, - }, - }; - } - - return { httpStream: fileStreamResponse }; - } catch (error) { - // If we created a stream but failed afterwards, destroy it - if (fileStreamResponse != null) { - destroyHttpStream(fileStreamResponse); - } - - return { - error: { - message: `Error while getting attachment stream for attachment with id ${id}. ${error}`, - }, - }; - } -}; - -/** - * Destroys a stream to prevent memory leaks. - * @param {any} httpStream - The axios response stream to destroy - */ -const destroyHttpStream = (httpStream: AxiosResponse): void => { - try { - if (httpStream && httpStream.data) { - if (typeof httpStream.data.destroy === 'function') { - httpStream.data.destroy(); - } else if (typeof httpStream.data.close === 'function') { - httpStream.data.close(); - } - } - } catch (error) { - console.warn('Error while destroying HTTP stream:', error); - } -}; - -processTask({ - task: async ({ adapter }) => { - try { - const response = await adapter.streamAttachments({ - stream: getAttachmentStream, - batchSize: 10, - }); - - if (response?.delay) { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsDelay, { - delay: response.delay, - }); - } else if (response?.error) { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsError, { - error: response.error, - }); - } else { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsDone); - } - } catch (error) { - console.error('An error occured while processing a task.', error); - } - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsProgress); - }, -}); diff --git a/src/workers/default-workers/data-deletion.ts b/src/workers/default-workers/data-deletion.ts deleted file mode 100644 index d9748e1..0000000 --- a/src/workers/default-workers/data-deletion.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { ExtractorEventType, processTask } from '../../index'; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionDataDeleteDone); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionDataDeleteError, { - error: { - message: 'Failed to delete data. Lambda timeout.', - }, - }); - }, -}); diff --git a/src/workers/default-workers/data-extraction.ts b/src/workers/default-workers/data-extraction.ts deleted file mode 100644 index 682e3cf..0000000 --- a/src/workers/default-workers/data-extraction.ts +++ /dev/null @@ -1,107 +0,0 @@ -import { EventType, ExtractorEventType, processTask } from '../../index'; - -import { - normalizeAttachment, - normalizeIssue, - normalizeUser, -} from '../dummy-extractor/data-normalization'; - -// Dummy data that originally would be fetched from an external source -const issues = [ - { - id: 'issue-1', - created_date: '1999-12-25T01:00:03+01:00', - modified_date: '1999-12-25T01:00:03+01:00', - body: '

This is issue 1

', - creator: 'user-1', - owner: 'user-1', - title: 'Issue 1', - }, - { - id: 'issue-2', - created_date: '1999-12-27T15:31:34+01:00', - modified_date: '2002-04-09T01:55:31+02:00', - body: '

This is issue 2

', - creator: 'user-2', - owner: 'user-2', - title: 'Issue 2', - }, -]; - -const users = [ - { - id: 'user-1', - created_date: '1999-12-25T01:00:03+01:00', - modified_date: '1999-12-25T01:00:03+01:00', - data: { - email: 'johndoe@test.com', - name: 'John Doe', - }, - }, - { - id: 'user-2', - created_date: '1999-12-27T15:31:34+01:00', - modified_date: '2002-04-09T01:55:31+02:00', - data: { - email: 'janedoe@test.com', - name: 'Jane Doe', - }, - }, -]; - -const attachments = [ - { - url: 'https://app.dev.devrev-eng.ai/favicon.ico', - id: 'attachment-1', - file_name: 'dummy.jpg', - author_id: 'user-1', - parent_id: 'issue-1', - }, - { - url: 'https://app.dev.devrev-eng.ai/favicon.ico', - id: 'attachment-2', - file_name: 'dummy.ico', - author_id: 'user-2', - parent_id: 'issue-2', - }, -]; - -const repos = [ - { - itemType: 'issues', - normalize: normalizeIssue, - }, - { - itemType: 'users', - normalize: normalizeUser, - }, - { - itemType: 'attachments', - normalize: normalizeAttachment, - }, -]; - -processTask({ - task: async ({ adapter }) => { - console.log('Logging something from worker thread', {}); - - adapter.initializeRepos(repos); - if (adapter.event.payload.event_type === EventType.ExtractionDataStart) { - await adapter.getRepo('issues')?.push(issues); - await adapter.emit(ExtractorEventType.ExtractionDataProgress, { - progress: 50, - }); - } else { - await adapter.getRepo('users')?.push(users); - await adapter.getRepo('attachments')?.push(attachments); - await adapter.emit(ExtractorEventType.ExtractionDataDone, { - progress: 100, - }); - } - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionDataProgress, { - progress: 50, - }); - }, -}); diff --git a/src/workers/default-workers/delete-loader-attachment-state.ts b/src/workers/default-workers/delete-loader-attachment-state.ts deleted file mode 100644 index dc924f8..0000000 --- a/src/workers/default-workers/delete-loader-attachment-state.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { LoaderEventType, processTask } from '../../index'; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(LoaderEventType.LoaderAttachmentStateDeletionDone); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(LoaderEventType.LoaderAttachmentStateDeletionError, { - error: { - message: 'Failed to delete attachment state. Timeout.', - }, - }); - }, -}); diff --git a/src/workers/default-workers/delete-loader-state.ts b/src/workers/default-workers/delete-loader-state.ts deleted file mode 100644 index 875e08a..0000000 --- a/src/workers/default-workers/delete-loader-state.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { LoaderEventType, processTask } from '../../index'; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(LoaderEventType.LoaderStateDeletionDone); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(LoaderEventType.LoaderStateDeletionError, { - error: { - message: 'Failed to delete data. Lambda timeout.', - }, - }); - }, -}); diff --git a/src/workers/default-workers/external-sync-units-extraction.ts b/src/workers/default-workers/external-sync-units-extraction.ts deleted file mode 100644 index ac5aee7..0000000 --- a/src/workers/default-workers/external-sync-units-extraction.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { ExternalSyncUnit, ExtractorEventType, processTask } from '../../index'; - -// Dummy data that originally would be fetched from an external source -const externalSyncUnits: ExternalSyncUnit[] = [ - { - id: 'devrev', - name: 'devrev', - description: 'Demo external sync unit', - item_count: 2, - item_type: 'issues', - }, -]; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, { - external_sync_units: externalSyncUnits, - }); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsError, { - error: { - message: 'Failed to extract external sync units. Lambda timeout.', - }, - }); - }, -}); diff --git a/src/workers/default-workers/load-attachments.ts b/src/workers/default-workers/load-attachments.ts deleted file mode 100644 index ccd663d..0000000 --- a/src/workers/default-workers/load-attachments.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { LoaderEventType } from '../../types'; -import { processTask } from '../process-task'; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(LoaderEventType.UnknownEventType, { - error: { - message: - 'Event type ' + adapter.event.payload.event_type + ' not supported.', - }, - }); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(LoaderEventType.AttachmentLoadingError, { - reports: adapter.reports, - processed_files: adapter.processedFiles, - }); - }, -}); diff --git a/src/workers/default-workers/load-data.ts b/src/workers/default-workers/load-data.ts deleted file mode 100644 index 35ff361..0000000 --- a/src/workers/default-workers/load-data.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { LoaderEventType } from '../../types/loading'; -import { processTask } from '../process-task'; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(LoaderEventType.DataLoadingDone, { - reports: adapter.reports, - processed_files: adapter.processedFiles, - }); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(LoaderEventType.DataLoadingError, { - reports: adapter.reports, - processed_files: adapter.processedFiles, - }); - }, -}); diff --git a/src/workers/default-workers/metadata-extraction.ts b/src/workers/default-workers/metadata-extraction.ts deleted file mode 100644 index fbc1ecd..0000000 --- a/src/workers/default-workers/metadata-extraction.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { ExtractorEventType, processTask } from '../../index'; - -import externalDomainMetadata from '../dummy-extractor/external_domain_metadata.json'; - -const repos = [ - { - itemType: 'external_domain_metadata', - }, -]; - -processTask({ - task: async ({ adapter }) => { - adapter.initializeRepos(repos); - await adapter - .getRepo('external_domain_metadata') - ?.push([externalDomainMetadata]); - await adapter.emit(ExtractorEventType.ExtractionMetadataDone); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionMetadataError, { - error: { message: 'Failed to extract metadata. Lambda timeout.' }, - }); - }, -}); diff --git a/src/workers/dummy-extractor/data-normalization.ts b/src/workers/dummy-extractor/data-normalization.ts deleted file mode 100644 index 5299c16..0000000 --- a/src/workers/dummy-extractor/data-normalization.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { NormalizedAttachment, NormalizedItem } from '../../index'; - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export function normalizeIssue(item: any): NormalizedItem { - return { - id: item.id, - created_date: item.created_date, - modified_date: item.modified_date, - data: { - body: item.body, - creator: item.creator, - owner: item.owner, - title: item.title, - }, - }; -} - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export function normalizeUser(item: any): NormalizedItem { - return { - id: item.id, - created_date: item.created_date, - modified_date: item.modified_date, - data: { - email: item.email, - name: item.name, - }, - }; -} - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export function normalizeAttachment(item: any): NormalizedAttachment { - return { - url: item.url, - id: item.id, - file_name: item.file_name, - author_id: item.author_id, - parent_id: item.parent_id, - }; -} diff --git a/src/workers/dummy-extractor/external_domain_metadata.json b/src/workers/dummy-extractor/external_domain_metadata.json deleted file mode 100644 index 2dbba0a..0000000 --- a/src/workers/dummy-extractor/external_domain_metadata.json +++ /dev/null @@ -1,58 +0,0 @@ -{ - "record_types": { - "issues": { - "name": "Issues", - "fields": { - "title": { - "is_required": true, - "type": "text", - "name": "Title", - "text": { - "min_length": 1 - } - }, - "body": { - "type": "rich_text", - "name": "body", - "is_required": true - }, - "owner": { - "is_required": true, - "type": "reference", - "reference": { - "refers_to": { - "#record:users": {} - } - } - }, - "creator": { - "is_required": true, - "type": "reference", - "reference": { - "refers_to": { - "#record:users": {} - } - } - } - } - }, - "users": { - "name": "Users", - "fields": { - "name": { - "is_required": true, - "type": "text", - "name": "Name", - "text": { - "min_length": 1 - } - }, - "email": { - "type": "text", - "name": "Email", - "is_required": true - } - } - } - } -} diff --git a/src/workers/process-task.ts b/src/workers/process-task.ts index b7300b0..2f76617 100644 --- a/src/workers/process-task.ts +++ b/src/workers/process-task.ts @@ -1,4 +1,5 @@ import { isMainThread, parentPort, workerData } from 'node:worker_threads'; +import { translateIncomingEventType } from '../common/event-type-translation'; import { Logger, serializeError } from '../logger/logger'; import { createAdapterState } from '../state/state'; import { @@ -16,6 +17,12 @@ export function processTask({ void (async () => { try { const event = workerData.event; + + // TODO: Remove when the old types are completely phased out + event.payload.event_type = translateIncomingEventType( + event.payload.event_type + ); + const initialState = workerData.initialState as ConnectorState; const initialDomainMapping = workerData.initialDomainMapping; const options = workerData.options; diff --git a/src/workers/spawn.ts b/src/workers/spawn.ts index 05938f4..f08e989 100644 --- a/src/workers/spawn.ts +++ b/src/workers/spawn.ts @@ -2,6 +2,7 @@ import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import { emit } from '../common/control-protocol'; +import { translateIncomingEventType } from '../common/event-type-translation'; import { getMemoryUsage, getTimeoutErrorEventType } from '../common/helpers'; import { Logger, serializeError } from '../logger/logger'; import { @@ -27,52 +28,27 @@ import { createWorker } from './create-worker'; function getWorkerPath({ event, - connectorWorkerPath, + workerBasePath }: GetWorkerPathInterface): string | null { - if (connectorWorkerPath) return connectorWorkerPath; let path = null; switch (event.payload.event_type) { - // Extraction - case EventType.ExtractionExternalSyncUnitsStart: - path = __dirname + '/default-workers/external-sync-units-extraction'; + case EventType.StartExtractingExternalSyncUnits: + path = '/workers/external-sync-units-extraction'; break; - case EventType.ExtractionMetadataStart: - path = __dirname + '/default-workers/metadata-extraction'; + case EventType.StartExtractingMetadata: + path = '/workers/metadata-extraction'; break; - case EventType.ExtractionDataStart: - case EventType.ExtractionDataContinue: - path = __dirname + '/default-workers/data-extraction'; + case EventType.StartExtractingData: + case EventType.ContinueExtractingData: + path = '/workers/data-extraction'; break; - case EventType.ExtractionAttachmentsStart: - case EventType.ExtractionAttachmentsContinue: - path = __dirname + '/default-workers/attachments-extraction'; + case EventType.StartExtractingAttachments: + case EventType.ContinueExtractingAttachments: + path = '/workers/attachments-extraction'; break; - case EventType.ExtractionDataDelete: - path = __dirname + '/default-workers/data-deletion'; - break; - case EventType.ExtractionAttachmentsDelete: - path = __dirname + '/default-workers/attachments-deletion'; - break; - - // Loading - case EventType.StartLoadingData: - case EventType.ContinueLoadingData: - path = __dirname + '/default-workers/load-data'; - break; - case EventType.StartLoadingAttachments: - case EventType.ContinueLoadingAttachments: - path = __dirname + '/default-workers/load-attachments'; - break; - case EventType.StartDeletingLoaderState: - path = __dirname + '/default-workers/delete-loader-state'; - break; - case EventType.StartDeletingLoaderAttachmentState: - path = __dirname + '/default-workers/delete-loader-attachment-state'; - break; - default: - path = null; } - return path; + + return path ? workerBasePath + path : null; } /** @@ -81,9 +57,9 @@ function getWorkerPath({ * The class provides utilities to emit control events to the platform and exit the worker gracefully. * In case of lambda timeout, the class emits a lambda timeout event to the platform. * @param {SpawnFactoryInterface} options - The options to create a new instance of Spawn class - * @param {AirdropEvent} event - The event object received from the platform - * @param {object} initialState - The initial state of the adapter - * @param {string} workerPath - The path to the worker file + * @param {AirdropEvent} options.event - The event object received from the platform + * @param {object} options.initialState - The initial state of the adapter + * @param {string} options.workerPath - The path to the worker file * @returns {Promise} - A new instance of Spawn class */ export async function spawn({ @@ -93,6 +69,22 @@ export async function spawn({ initialDomainMapping, options, }: SpawnFactoryInterface): Promise { + // Translates incoming event type for backwards compatibility + // This allows the SDK to accept both old and new event type formats + const originalEventType = event.payload.event_type; + const translatedEventType = translateIncomingEventType( + event.payload.event_type as string + ); + + // Update the event with the translated event type + event.payload.event_type = translatedEventType; + + if (translatedEventType !== originalEventType) { + console.log( + `Event type translated from ${originalEventType} to ${translatedEventType}.` + ); + } + if (options?.isLocalDevelopment) { console.log('Snap-in is running in local development mode.'); } @@ -109,10 +101,22 @@ export async function spawn({ const originalConsole = console; // eslint-disable-next-line no-global-assign console = new Logger({ event, options }); - const script = getWorkerPath({ - event, - connectorWorkerPath: workerPath, - }); + + let script = null; + if (workerPath != null) { + script = workerPath; + } else if ( + options?.baseWorkerPath != null && + options?.workerPathOverrides != null && + options.workerPathOverrides[translatedEventType as EventType] != null + ) { + script = options.baseWorkerPath + options.workerPathOverrides[translatedEventType as EventType]; + } else { + script = getWorkerPath({ + event, + workerBasePath: options?.baseWorkerPath ?? __dirname + }); + } if (script) { try { diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index 53519c4..8a5c144 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -204,7 +204,7 @@ export class WorkerAdapter { } // We want to upload all the repos before emitting the event, except for the external sync units done event - if (newEventType !== ExtractorEventType.ExtractionExternalSyncUnitsDone) { + if (newEventType !== ExtractorEventType.ExternalSyncUnitExtractionDone) { console.log( `Uploading all repos before emitting event with event type: ${newEventType}.` ); @@ -220,7 +220,7 @@ export class WorkerAdapter { } // If the extraction is done, we want to save the timestamp of the last successful sync - if (newEventType === ExtractorEventType.ExtractionAttachmentsDone) { + if (newEventType === ExtractorEventType.AttachmentExtractionDone) { console.log( `Overwriting lastSuccessfulSyncStarted with lastSyncStarted (${this.state.lastSyncStarted}).` ); @@ -375,7 +375,7 @@ export class WorkerAdapter { }); if (rateLimit?.delay) { - await this.emit(LoaderEventType.DataLoadingDelay, { + await this.emit(LoaderEventType.DataLoadingDelayed, { delay: rateLimit.delay, reports: this.reports, processed_files: this.processedFiles, @@ -478,7 +478,7 @@ export class WorkerAdapter { }); if (rateLimit?.delay) { - await this.emit(LoaderEventType.DataLoadingDelay, { + await this.emit(LoaderEventType.DataLoadingDelayed, { delay: rateLimit.delay, reports: this.reports, processed_files: this.processedFiles, diff --git a/src/workers/worker.js b/src/workers/worker.js index e627a3f..dee5480 100644 --- a/src/workers/worker.js +++ b/src/workers/worker.js @@ -3,7 +3,7 @@ const { workerData } = require('worker_threads'); require('ts-node').register(); const { Logger } = require('../logger/logger'); -// eslint-disable-next-line no-global-assign + console = new Logger({ event: workerData.event, options: workerData.options }); require(workerData.workerPath);