Skip to content

Commit

Permalink
Merge pull request #62 from Callgent/fix/invocation
Browse files Browse the repository at this point in the history
feat: sparkpost relay-webhook integration
  • Loading branch information
Jamesp918 committed Sep 2, 2024
2 parents 8a2bd99 + d3c0fe4 commit b0c5eed
Show file tree
Hide file tree
Showing 28 changed files with 540 additions and 166 deletions.
6 changes: 6 additions & 0 deletions .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,11 @@ CALLGENT_SITE_URL=https://callgent.com
CALLGENT_SITE_NAME=Callgent.com

EMAIL_DEFAULT_SENDER={"name": "Callgent", "email": "[email protected]"}
EMAIL_RELAY_HOST=my.callgent.com

# https://app.brevo.com/settings/keys/api
EMAIL_BREVO_API_KEY=key
EMAIL_SPARKPOST_API_KEY=xxx
EMAIL_SPARKPOST_RELAY_CLIENT_ID=spark-post-relayer
EMAIL_SPARKPOST_RELAY_CLIENT_SECRET=xxx
EMAIL_SPARKPOST_RELAY_EXPIRES_IN=86400
6 changes: 6 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,11 @@ CALLGENT_SITE_URL=https://callgent.com
CALLGENT_SITE_NAME=Callgent.com

EMAIL_DEFAULT_SENDER={"name": "Callgent", "email": "[email protected]"}
EMAIL_RELAY_HOST=my.callgent.com

# https://app.brevo.com/settings/keys/api
EMAIL_BREVO_API_KEY=key
EMAIL_SPARKPOST_API_KEY=xxx
EMAIL_SPARKPOST_RELAY_CLIENT_ID=spark-post-relayer
EMAIL_SPARKPOST_RELAY_CLIENT_SECRET=xxx
EMAIL_SPARKPOST_RELAY_EXPIRES_IN=86400
6 changes: 3 additions & 3 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -504,21 +504,21 @@ model EventStore {
targetId String? @db.VarChar(36)
eventType String @db.VarChar(36)
dataType String @db.VarChar(36)
/// @Description callback url or parent event id
/// @Description callback url or parent event id to invoke
callback String? @db.VarChar(1023)
/// @Description callback type, 'URL' or 'EVENT'
callbackType EventCallbackType @default(EVENT)
data Json? @db.Json
context Json? @db.Json
/// @description statusCode, -1: processing, 0: done, 1: pending, >1: error
/// @description statusCode, -1: processing, 0: done, 1: pending: waiting for external event trigger to to resume calling current-listener.funName, >1: error
statusCode Int @default(-1)
message String? @db.VarChar(255)
stopPropagation Boolean
defaultPrevented Boolean
listenerId String? @db.VarChar(36)
funName String? @db.VarChar(255)
funName String? @db.VarChar(255) /// @description listener's function to call on pending event activated.
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
Expand Down
10 changes: 5 additions & 5 deletions prisma/seed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,21 @@ function initEventListeners() {
serviceName: 'AgentsService',
funName: 'map2Function',
description:
'Map the request event to a endpoint function, put into event.context.map2Function and event.context.functions[0]',
'Map the request to a endpoint function and corresponding args, put into event.context.map2Function and event.context.functions[0]',
createdBy: 'GLOBAL',
priority: (priority += 100),
},
{
pk: elId++,
id: 'CR-MAP-2-ARGS',
id: 'CR-INVOKE-SEP',
srcId: 'GLOBAL',
tenantPk: 0,
eventType: 'CLIENT_REQUEST',
dataType: '*',
serviceType: 'SERVICE',
serviceName: 'SandBoxService',
funName: 'map2Args',
description: 'Map the request event to the function arguments',
serviceName: 'EndpointsService',
funName: 'invokeSEP',
description: 'Do actual invocation through the SEP adaptor',
createdBy: 'GLOBAL',
priority: (priority += 100),
},
Expand Down
24 changes: 15 additions & 9 deletions src/agents/agents.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BadRequestException, HttpException, Injectable } from '@nestjs/common';
import { CallgentFunctionDto } from '../callgent-functions/dto/callgent-function.dto';
import { AdaptedDataSource } from '../endpoints/adaptors/endpoint-adaptor.interface';
import { AdaptedDataSource } from '../endpoints/adaptors/endpoint-adaptor.base';
import { TaskActionDto } from '../task-actions/dto/task-action.dto';
import { LLMService } from './llm.service';
import { ClientRequestEvent } from '../endpoints/events/client-request.event';
Expand Down Expand Up @@ -35,7 +35,7 @@ export class AgentsService {
*/
async map2Function(
reqEvent: ClientRequestEvent,
): Promise<void | { event: ClientRequestEvent; callbackName?: string }> {
): Promise<void | { data: ClientRequestEvent; callbackName?: string }> {
const {
id,
srcId,
Expand All @@ -52,6 +52,8 @@ export class AgentsService {

// FIXME map from all targetId events

// TODO how to use mapping function: for specific req & function

const mapped = await this.llmService.template(
'map2Function',
{
Expand All @@ -72,16 +74,20 @@ export class AgentsService {
);

// emit progressive requesting event
const { event: prEvent, statusCode } =
const { data: prEvent, statusCode } =
await this.eventListenersService.emit(
new ProgressiveRequestEvent(srcId, id, cepAdaptor, { progressive }),
new ProgressiveRequestEvent(srcId, id, cepAdaptor, {
progressive,
// mapped,
}),
);
if (!statusCode) {
if (!statusCode)
// direct return, no persistent async
return this.map2FunctionProgressive(prEvent, reqEvent);
}

if (statusCode == 1)
return { event: reqEvent, callbackName: 'map2FunctionProgressive' };
// pending
return { data: reqEvent, callbackName: 'map2FunctionProgressive' };
throw new HttpException(prEvent.message, statusCode);
} else {
const functions = reqEvent.context.functions.filter(
Expand All @@ -95,9 +101,9 @@ export class AgentsService {

/** progressive response, to continue mapping */
async map2FunctionProgressive(
event: ProgressiveRequestEvent,
data: ProgressiveRequestEvent,
reqEvent?: ClientRequestEvent,
): Promise<void | { event: ClientRequestEvent; callbackName?: string }> {
): Promise<void | { data: ClientRequestEvent; callbackName?: string }> {
// handle resp
}

Expand Down
2 changes: 1 addition & 1 deletion src/callgent-functions/callgent-functions.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
getSchemaPath,
} from '@nestjs/swagger';
import { IsNotEmpty, IsOptional } from 'class-validator';
import { ApiSpec } from '../endpoints/adaptors/endpoint-adaptor.interface';
import { ApiSpec } from '../endpoints/adaptors/endpoint-adaptor.base';
import { EndpointDto } from '../endpoints/dto/endpoint.dto';
import { JwtGuard } from '../infra/auth/jwt/jwt.guard';
import { EntityIdExists } from '../infra/repo/validators/entity-exists.validator';
Expand Down
4 changes: 2 additions & 2 deletions src/callgent-functions/callgent-functions.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-pr
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { PaginatorTypes, paginator } from '@nodeteam/nestjs-prisma-pagination';
import { EndpointType, Prisma, PrismaClient } from '@prisma/client';
import { ApiSpec } from '../endpoints/adaptors/endpoint-adaptor.interface';
import { ApiSpec } from '../endpoints/adaptors/endpoint-adaptor.base';
import { EndpointDto } from '../endpoints/dto/endpoint.dto';
import { EndpointsService } from '../endpoints/endpoints.service';
import { ClientRequestEvent } from '../endpoints/events/client-request.event';
Expand Down Expand Up @@ -32,7 +32,7 @@ export class CallgentFunctionsService {
@Transactional()
async loadFunctions(
reqEvent: ClientRequestEvent,
): Promise<void | { event: ClientRequestEvent; callbackName?: string }> {
): Promise<void | { data: ClientRequestEvent; callbackName?: string }> {
const { funName, callgentId } = reqEvent.data;

// TODO if too many functions, use summary first
Expand Down
97 changes: 97 additions & 0 deletions src/emails/dto/sparkpost-relay-object.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Represents a single email header.
*/
interface Header {
[key: string]: string;
}

/**
* Represents the content parsed from the incoming message.
*/
interface Content {
/**
* Contents of the last text/html part of the message.
*/
html: string;

/**
* Contents of the last text/plain part of the message.
*/
text: string;

/**
* "Subject" header value (decoded from email).
*/
subject: string;

/**
* "To" header value (decoded from email), RFC2822 address list.
*/
to: string[];

/**
* "CC" header value (decoded from email), RFC2822 address list.
*/
cc?: string[];

/**
* Ordered array of email top-level headers. This array preserves ordering and allows for multiple occurrences of a header (e.g. to support trace headers such as "Received").
*/
headers: Header[];

/**
* Raw MIME content for an email. If the Raw MIME content contains at least one non UTF-8 encoded character, the entire email_rfc822 value will be base64 encoded and email_rfc822_is_base64 will be set to true.
*/
email_rfc822: string;

/**
* Whether the email_rfc822 value is base64 encoded.
*/
email_rfc822_is_base64: boolean;
}

/**
* Represents the relay message.
*/
export interface RelayMessage {
/**
* The content parsed from the incoming message.
*/
content: Content;

/**
* Customer ID of the customer that created the relay webhook.
*/
customer_id: string;

/**
* Email address used to compose the "From" header.
*/
friendly_from: string;

/**
* SMTP envelope "MAIL FROM", matches "Return-Path" header address.
*/
msg_from: string;

/**
* SMTP envelope "RCPT TO".
*/
rcpt_to: string;

/**
* ID of the relay webhook which triggered this relay message.
*/
webhook_id: string;

/**
* Protocol of the originating inbound message.
*/
protocol?: string;
}

export interface EmailRelayObject {
msys: {
relay_message: RelayMessage;
};
}
18 changes: 18 additions & 0 deletions src/emails/emails.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Test, TestingModule } from '@nestjs/testing';
import { EmailsController } from './emails.controller';

describe('EmailsController', () => {
let controller: EmailsController;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [EmailsController],
}).compile();

controller = module.get<EmailsController>(EmailsController);
});

it('should be defined', () => {
expect(controller).toBeDefined();
});
});
40 changes: 40 additions & 0 deletions src/emails/emails.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import {
BadRequestException,
Body,
Controller,
Headers,
HttpCode,
HttpStatus,
Post,
} from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { JwtAuthService } from '../infra/auth/jwt/jwt.service';
import { EmailsService } from './emails.service';
import { EmailRelayObject } from './dto/sparkpost-relay-object.interface';

@Controller('emails')
export class EmailsController {
constructor(
private readonly jwtAuthService: JwtAuthService,
private readonly configService: ConfigService,
private readonly emailsService: EmailsService,
) {}

/** @see https://developers.sparkpost.com/api/relay-webhooks/ */
@HttpCode(HttpStatus.OK)
@Post('relay/spark-post')
async handleRelayEvent(
@Headers('Authorization') authorization: string,
@Body() relays: EmailRelayObject[],
) {
const { sub } = this.jwtAuthService.verify(authorization?.substring(7));
if (sub !== this.configService.get('EMAIL_SPARKPOST_RELAY_CLIENT_ID'))
throw new BadRequestException('Invalid Client ID');

// handle relay event
relays?.forEach((relay) =>
this.emailsService.handleRelayMessage(relay.msys.relay_message),
);
return; // return 200 to consume the event
}
}
2 changes: 2 additions & 0 deletions src/emails/emails.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { Global, Module } from '@nestjs/common';
import { EmailTemplateProvider } from './email-template.provider';
import { EmailsService } from './emails.service';
import { EmailsController } from './emails.controller';

@Global()
@Module({
providers: [EmailsService, EmailTemplateProvider],
exports: [EmailsService],
controllers: [EmailsController],
})
export class EmailsModule {}
Loading

0 comments on commit b0c5eed

Please sign in to comment.