Skip to content

Commit

Permalink
6619 modify event emitter to emit an array of events (#6625)
Browse files Browse the repository at this point in the history
Closes #6619

---------

Co-authored-by: Charles Bochet <[email protected]>
  • Loading branch information
bosiraphael and charlesBochet authored Aug 20, 2024
1 parent 17a1760 commit 091c0f8
Show file tree
Hide file tree
Showing 41 changed files with 1,004 additions and 721 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { objectRecordChangedValues } from 'src/engine/integrations/event-emitter
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event';
import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job';

Expand All @@ -19,40 +20,46 @@ export class EntityEventsToDbListener {
) {}

@OnEvent('*.created')
async handleCreate(payload: ObjectRecordCreateEvent<any>) {
async handleCreate(
payload: WorkspaceEventBatch<ObjectRecordCreateEvent<any>>,
) {
return this.handle(payload);
}

@OnEvent('*.updated')
async handleUpdate(payload: ObjectRecordUpdateEvent<any>) {
payload.properties.diff = objectRecordChangedValues(
payload.properties.before,
payload.properties.after,
payload.properties.updatedFields,
payload.objectMetadata,
);
async handleUpdate(
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent<any>>,
) {
for (const eventPayload of payload.events) {
eventPayload.properties.diff = objectRecordChangedValues(
eventPayload.properties.before,
eventPayload.properties.after,
eventPayload.properties.updatedFields,
eventPayload.objectMetadata,
);
}

return this.handle(payload);
}

@OnEvent('*.deleted')
async handleDelete(payload: ObjectRecordUpdateEvent<any>) {
async handleDelete(
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent<any>>,
) {
return this.handle(payload);
}

private async handle(payload: ObjectRecordBaseEvent) {
if (!payload.objectMetadata?.isAuditLogged) {
return;
}

this.messageQueueService.add<ObjectRecordBaseEvent>(
CreateAuditLogFromInternalEvent.name,
payload,
private async handle(payload: WorkspaceEventBatch<ObjectRecordBaseEvent>) {
payload.events = payload.events.filter(
(event) => event.objectMetadata?.isAuditLogged,
);

this.messageQueueService.add<ObjectRecordBaseEvent>(
UpsertTimelineActivityFromInternalEvent.name,
payload,
);
await this.messageQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(CreateAuditLogFromInternalEvent.name, payload);

await this.messageQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(UpsertTimelineActivityFromInternalEvent.name, payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { OnEvent } from '@nestjs/event-emitter';
import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.service';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';

@Injectable()
export class TelemetryListener {
Expand All @@ -13,36 +14,48 @@ export class TelemetryListener {
) {}

@OnEvent('*.created')
async handleAllCreate(payload: ObjectRecordCreateEvent<any>) {
await this.analyticsService.create(
{
type: 'track',
data: {
eventName: payload.name,
},
},
payload.userId,
payload.workspaceId,
'', // voluntarely not retrieving this
'', // to avoid slowing down
this.environmentService.get('SERVER_URL'),
async handleAllCreate(
payload: WorkspaceEventBatch<ObjectRecordCreateEvent<any>>,
) {
await Promise.all(
payload.events.map((eventPayload) =>
this.analyticsService.create(
{
type: 'track',
data: {
eventName: payload.name,
},
},
eventPayload.userId,
payload.workspaceId,
'', // voluntarily not retrieving this
'', // to avoid slowing down
this.environmentService.get('SERVER_URL'),
),
),
);
}

@OnEvent('user.signup')
async handleUserSignup(payload: ObjectRecordCreateEvent<any>) {
await this.analyticsService.create(
{
type: 'track',
data: {
eventName: 'user.signup',
},
},
payload.userId,
payload.workspaceId,
'',
'',
this.environmentService.get('SERVER_URL'),
async handleUserSignup(
payload: WorkspaceEventBatch<ObjectRecordCreateEvent<any>>,
) {
await Promise.all(
payload.events.map((eventPayload) =>
this.analyticsService.create(
{
type: 'track',
data: {
eventName: 'user.signup',
},
},
eventPayload.userId,
payload.workspaceId,
'',
'',
this.environmentService.get('SERVER_URL'),
),
),
);
}
}
Loading

0 comments on commit 091c0f8

Please sign in to comment.