Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 91 additions & 5 deletions packages/core/src/destination-kit/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import type {
DynamicFieldContext,
ActionDestinationSuccessResponseType,
ActionDestinationErrorResponseType,
ResultMultiStatusNode
ResultMultiStatusNode,
AsyncPollResponseType
} from './types'
import { syncModeTypes } from './types'
import { HTTPError, NormalizedOptions } from '../request-client'
Expand Down Expand Up @@ -82,6 +83,13 @@ export interface BaseActionDefinition {
* The fields used to perform the action. These fields should match what the partner API expects.
*/
fields: ActionFields

/**
* The fields used specifically for polling async operations. These are typically minimal fields
* containing only identifiers needed to check operation status (e.g., operationId).
* REQUIRED when defining a poll method - ensures security and performance by validating only essential polling data.
*/
pollFields?: ActionFields
}

type HookValueTypes = string | boolean | number | Array<string | boolean | number>
Expand All @@ -103,7 +111,9 @@ export interface ActionDefinition<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
GeneratedActionHookInputs = any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
GeneratedActionHookOutputs = any
GeneratedActionHookOutputs = any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
PollPayload = any
> extends BaseActionDefinition {
/**
* A way to "register" dynamic fields.
Expand Down Expand Up @@ -139,6 +149,9 @@ export interface ActionDefinition<
/** The operation to perform when this action is triggered for a batch of events */
performBatch?: RequestFn<Settings, Payload[], PerformBatchResponse, AudienceSettings>

/** The operation to poll the status of async operation(s) - handles both single and batch operations */
poll?: RequestFn<Settings, PollPayload, AsyncPollResponseType, AudienceSettings>

/** Hooks are triggered at some point in a mappings lifecycle. They may perform a request with the
* destination using the provided inputs and return a response. The response may then optionally be stored
* in the mapping for later use in the action.
Expand Down Expand Up @@ -253,20 +266,27 @@ const isSyncMode = (value: unknown): value is SyncMode => {
* Action is the beginning step for all partner actions. Entrypoints always start with the
* MapAndValidateInput step.
*/
export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings = any> extends EventEmitter {
readonly definition: ActionDefinition<Settings, Payload, AudienceSettings>
export class Action<
Settings,
Payload extends JSONLikeObject,
AudienceSettings = any,
PollPayload = unknown
> extends EventEmitter {
readonly definition: ActionDefinition<Settings, Payload, AudienceSettings, unknown, unknown, PollPayload>
readonly destinationName: string
readonly schema?: JSONSchema4
readonly pollSchema?: JSONSchema4
readonly hookSchemas?: Record<string, JSONSchema4>
readonly hasBatchSupport: boolean
readonly hasHookSupport: boolean
readonly hasPollSupport: boolean
// Payloads may be any type so we use `any` explicitly here.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private extendRequest: RequestExtension<Settings, any> | undefined

constructor(
destinationName: string,
definition: ActionDefinition<Settings, Payload, AudienceSettings>,
definition: ActionDefinition<Settings, Payload, AudienceSettings, unknown, unknown, PollPayload>,
// Payloads may be any type so we use `any` explicitly here.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
extendRequest?: RequestExtension<Settings, any>
Expand All @@ -277,10 +297,16 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
this.extendRequest = extendRequest
this.hasBatchSupport = typeof definition.performBatch === 'function'
this.hasHookSupport = definition.hooks !== undefined
this.hasPollSupport = typeof definition.poll === 'function'
// Generate json schema based on the field definitions
if (Object.keys(definition.fields ?? {}).length) {
this.schema = fieldsToJsonSchema(definition.fields)
}

// Generate json schema for poll fields if they exist
if (Object.keys(definition.pollFields ?? {}).length) {
this.pollSchema = fieldsToJsonSchema(definition.pollFields)
}
// Generate a json schema for each defined hook based on the field definitions
if (definition.hooks) {
for (const hookName in definition.hooks) {
Expand Down Expand Up @@ -586,6 +612,61 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
return multiStatusResponse
}

async executePoll(
bundle: ExecuteBundle<Settings, InputData | undefined, AudienceSettings>
): Promise<AsyncPollResponseType> {
if (!this.hasPollSupport) {
throw new IntegrationError('This action does not support polling operations.', 'NotImplemented', 501)
}

// For polling operations, use the input data directly as it should already contain
// the structured poll payload (e.g., { operationId: "abc-123" })
const payload = bundle.data as PollPayload

// Remove empty values and validate using poll schema (required for polling operations)
if (!this.pollSchema) {
throw new IntegrationError('Poll fields must be defined for polling operations.', 'NotImplemented', 501)
}
const validationSchema = this.pollSchema
// Cast to PollPayload as the removeEmptyValues pipeline produces a valid poll payload
// This represents the PollPayload type defined in the ActionDefinition (e.g., { operationId: string })
const pollPayload = removeEmptyValues(payload, validationSchema, true) as PollPayload
// Validate the resolved payload against the poll schema
const schemaKey = `${this.destinationName}:${this.definition.title}:poll`
validateSchema(pollPayload, validationSchema, {
schemaKey,
statsContext: bundle.statsContext,
exempt: ['dynamicAuthSettings']
})

// Construct the data bundle to send to the poll action
const dataBundle = {
rawData: bundle.data,
rawMapping: bundle.mapping,
settings: bundle.settings,
payload: pollPayload,
auth: bundle.auth,
features: bundle.features,
statsContext: bundle.statsContext,
logger: bundle.logger,
engageDestinationCache: bundle.engageDestinationCache,
transactionContext: bundle.transactionContext,
stateContext: bundle.stateContext,
audienceSettings: bundle.audienceSettings,
subscriptionMetadata: bundle.subscriptionMetadata,
signal: bundle?.signal
}

// Construct the request client and perform the poll operation
const requestClient = this.createRequestClient(dataBundle)
if (!this.definition.poll) {
throw new IntegrationError('Poll method is not defined.', 'NotImplemented', 501)
}
const pollResponse = await this.definition.poll(requestClient, dataBundle)

return pollResponse
}

/*
* Extract the dynamic field context and handler path from a field string. Examples:
* - "structured.first_name" => { dynamicHandlerPath: "structured.first_name" }
Expand Down Expand Up @@ -716,6 +797,11 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
}

private parseResponse(response: unknown): unknown {
// Handle async action responses by returning them as-is
if (response && typeof response === 'object' && (response as any).isAsync === true) {
return response
}

/**
* Try to use the parsed response `.data` or `.content` string
* @see {@link ../middleware/after-response/prepare-response.ts}
Expand Down
51 changes: 49 additions & 2 deletions packages/core/src/destination-kit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import type {
Deletion,
DeletionPayload,
DynamicFieldResponse,
ResultMultiStatusNode
ResultMultiStatusNode,
AsyncActionResponseType,
AsyncPollResponseType
} from './types'
import type { AllRequestOptions } from '../request-client'
import { ErrorCodes, IntegrationError, InvalidAuthenticationError, MultiStatusErrorReporter } from '../errors'
Expand All @@ -44,7 +46,9 @@ export type {
ActionHookType,
ExecuteInput,
RequestFn,
Result
Result,
AsyncActionResponseType,
AsyncPollResponseType
}
export { hookTypeStrings }
export type { MinimalInputField }
Expand Down Expand Up @@ -716,6 +720,49 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
return action.executeDynamicField(fieldKey, data, dynamicFn)
}

public async executePoll(
actionSlug: string,
{
event,
mapping,
subscriptionMetadata,
settings,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
signal
}: EventInput<Settings>
): Promise<AsyncPollResponseType> {
const action = this.actions[actionSlug]
if (!action) {
throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404)
}

let audienceSettings = {} as AudienceSettings
if (event.context?.personas) {
audienceSettings = event.context?.personas?.audience_settings as AudienceSettings
}
const authData = getAuthData(settings as JSONObject)
return action.executePoll({
mapping,
data: event as unknown as InputData,
settings,
audienceSettings,
auth: authData,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
subscriptionMetadata,
signal
})
}

private async onSubscription(
subscription: Subscription,
events: SegmentEvent | SegmentEvent[],
Expand Down
34 changes: 34 additions & 0 deletions packages/core/src/destination-kit/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,40 @@ export type ActionDestinationErrorResponseType = {
body?: JSONLikeObject | string
}

export type AsyncActionResponseType = {
/** Indicates this is an async operation */
isAsync: true
/** Optional message about the async operation(s) */
message?: string
/** Initial status code */
status?: number
}

export type AsyncOperationResult = {
/** The current status of this operation */
status: 'pending' | 'completed' | 'failed'
/** Message about current state */
message?: string
/** Final result data when status is 'completed' */
result?: JSONLikeObject
/** Error information when status is 'failed' */
error?: {
code: string
message: string
}
/** Original context for this operation */
context?: JSONLikeObject
}

export type AsyncPollResponseType = {
/** Array of operation results - single element for individual operations, multiple for batch */
results: AsyncOperationResult[]
/** Overall status - completed when all operations are done */
overallStatus: 'pending' | 'completed' | 'failed' | 'partial'
/** Summary message */
message?: string
}

export type ResultMultiStatusNode =
| ActionDestinationSuccessResponseType
| (ActionDestinationErrorResponseType & {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export { Destination, fieldsToJsonSchema } from './destination-kit'
export type { AsyncActionResponseType, AsyncPollResponseType } from './destination-kit'
export { getAuthData } from './destination-kit/parse-settings'
export { transform, Features } from './mapping-kit'
export {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const action: ActionDefinition<Settings, Payload> = {
}
},
perform: async (request, { settings, payload, hookOutputs }) => {
console.log('perform called')
const dataExtensionId: string =
hookOutputs?.onMappingSave?.outputs?.id || hookOutputs?.retlOnMappingSave?.outputs?.id

Expand All @@ -66,7 +67,7 @@ const action: ActionDefinition<Settings, Payload> = {
performBatch: async (request, { settings, payload, hookOutputs }) => {
const dataExtensionId: string =
hookOutputs?.onMappingSave?.outputs?.id || hookOutputs?.retlOnMappingSave?.outputs?.id

console.log('perform batch called', dataExtensionId)
if (!dataExtensionId) {
throw new IntegrationError('No Data Extension Connected', 'INVALID_CONFIGURATION', 400)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import nock from 'nock'
import { createTestEvent, createTestIntegration } from '@segment/actions-core'
import Destination from '../../index'

const testDestination = createTestIntegration(Destination)

describe('SalesforceMarketingCloud.contactDataExtensionV2Async', () => {
beforeEach(() => {
nock.cleanAll()
})

describe('perform', () => {
it('should throw error when perform method is called directly', async () => {
const event = createTestEvent({
type: 'identify',
userId: 'user-123',
traits: {
email: '[email protected]'
}
})

await expect(
testDestination.testAction('contactDataExtensionV2Async', {
event,
mapping: {
keys: {
contactKey: 'user-123'
},
values: {
email: '[email protected]'
}
},
settings: {
subdomain: 'mc123456789',
client_id: 'client_id_123',
client_secret: 'client_secret_123',
account_id: 'account_id_123'
}
})
).rejects.toThrowError('This action only supports batch operations')
})
})
})
Loading
Loading