Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6619 modify event emitter to emit an array of events #6625

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d522cbd
update events payload to be arrays
bosiraphael Aug 14, 2024
1855cb3
WIP: update listeners to handle array of events
bosiraphael Aug 14, 2024
1cd48a0
WIP: update listeners to handle array of events
bosiraphael Aug 14, 2024
6eb8b19
create TwentyEventEmitter
bosiraphael Aug 14, 2024
da80c7b
use TwentyEventEmitter to emit events
bosiraphael Aug 14, 2024
7614d0b
update twentyEventEmitter to workspaceEventEmitter
bosiraphael Aug 14, 2024
453af51
update listeners
bosiraphael Aug 14, 2024
7862eb3
update listeners
bosiraphael Aug 14, 2024
98a993e
update listeners
bosiraphael Aug 14, 2024
df3416f
update listeners
bosiraphael Aug 14, 2024
249a2ce
update listeners
bosiraphael Aug 14, 2024
40975e1
improvements to entity-events-to-db.listener
bosiraphael Aug 19, 2024
5a3cb4d
remove unnecessary async
bosiraphael Aug 19, 2024
609c6ca
improve calendar-blocklist.listener
bosiraphael Aug 19, 2024
3670887
replace return with continue in loops
bosiraphael Aug 19, 2024
04fe229
update database-event-trigger.listener
bosiraphael Aug 19, 2024
ba5bbed
create workspace event emitter module
bosiraphael Aug 19, 2024
6ca682b
import WorkspaceEventEmitterModule in QueueWorkerModule
bosiraphael Aug 19, 2024
a340ea2
Merge branch 'main' into 6619-modify-event-emitter-to-emit-an-array-o…
bosiraphael Aug 19, 2024
0366683
fixes after merge
bosiraphael Aug 19, 2024
3200b0f
Merge branch 'main' into 6619-modify-event-emitter-to-emit-an-array-o…
bosiraphael Aug 20, 2024
a63706f
refactor WorkspaceEventEmitter
bosiraphael Aug 20, 2024
3ef6831
fix wrong payload
bosiraphael Aug 20, 2024
cd3a285
fix participant matching
bosiraphael Aug 20, 2024
c78e768
fix calendar-event-cleaner
bosiraphael Aug 20, 2024
ab64dab
fix participant matching
bosiraphael Aug 20, 2024
3767a35
Merge branch 'main' into 6619-modify-event-emitter-to-emit-an-array-o…
charlesBochet Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { objectRecordChangedValues } from 'src/engine/integrations/event-emitter
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event';
import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job';

Expand All @@ -19,18 +20,24 @@ export class EntityEventsToDbListener {
) {}

@OnEvent('*.created')
async handleCreate(payload: ObjectRecordCreateEvent<any>) {
async handleCreate(
payload: WorkspaceEventBatch<ObjectRecordCreateEvent<any>>,
) {
return this.handle(payload);
}

@OnEvent('*.updated')
async handleUpdate(payload: ObjectRecordUpdateEvent<any>) {
payload.properties.diff = objectRecordChangedValues(
payload.properties.before,
payload.properties.after,
payload.properties.updatedFields,
payload.objectMetadata,
);
async handleUpdate(
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent<any>>,
) {
for (const eventPayload of payload.events) {
eventPayload.properties.diff = objectRecordChangedValues(
eventPayload.properties.before,
eventPayload.properties.after,
eventPayload.properties.updatedFields,
eventPayload.objectMetadata,
);
}

return this.handle(payload);
}
Expand All @@ -41,19 +48,21 @@ export class EntityEventsToDbListener {
// @OnEvent('*.restored') - TODO: implement when we soft delete has been implemented
// ....

private async handle(payload: ObjectRecordBaseEvent) {
if (!payload.objectMetadata?.isAuditLogged) {
return;
}
private async handle(payload: WorkspaceEventBatch<ObjectRecordBaseEvent>) {
for (const eventPayload of payload.events) {
if (!eventPayload.objectMetadata?.isAuditLogged) {
return;
}

this.messageQueueService.add<ObjectRecordBaseEvent>(
CreateAuditLogFromInternalEvent.name,
payload,
);
await this.messageQueueService.add<ObjectRecordBaseEvent>(
CreateAuditLogFromInternalEvent.name,
eventPayload,
);

this.messageQueueService.add<ObjectRecordBaseEvent>(
UpsertTimelineActivityFromInternalEvent.name,
payload,
);
await this.messageQueueService.add<ObjectRecordBaseEvent>(
UpsertTimelineActivityFromInternalEvent.name,
eventPayload,
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { OnEvent } from '@nestjs/event-emitter';
import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.service';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';

@Injectable()
export class TelemetryListener {
Expand All @@ -13,36 +14,48 @@ export class TelemetryListener {
) {}

@OnEvent('*.created')
async handleAllCreate(payload: ObjectRecordCreateEvent<any>) {
await this.analyticsService.create(
{
type: 'track',
data: {
eventName: payload.name,
},
},
payload.userId,
payload.workspaceId,
'', // voluntarely not retrieving this
'', // to avoid slowing down
this.environmentService.get('SERVER_URL'),
async handleAllCreate(
payload: WorkspaceEventBatch<ObjectRecordCreateEvent<any>>,
) {
await Promise.all(
payload.events.map((eventPayload) =>
this.analyticsService.create(
{
type: 'track',
data: {
eventName: payload.name,
},
},
eventPayload.userId,
payload.workspaceId,
'', // voluntarily not retrieving this
'', // to avoid slowing down
this.environmentService.get('SERVER_URL'),
),
),
);
}

@OnEvent('user.signup')
async handleUserSignup(payload: ObjectRecordCreateEvent<any>) {
await this.analyticsService.create(
{
type: 'track',
data: {
eventName: 'user.signup',
},
},
payload.userId,
payload.workspaceId,
'',
'',
this.environmentService.get('SERVER_URL'),
async handleUserSignup(
payload: WorkspaceEventBatch<ObjectRecordCreateEvent<any>>,
) {
await Promise.all(
payload.events.map(async (eventPayload) =>
this.analyticsService.create(
{
type: 'track',
data: {
eventName: 'user.signup',
},
},
eventPayload.userId,
payload.workspaceId,
'',
'',
this.environmentService.get('SERVER_URL'),
),
),
);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Injectable, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';

import isEmpty from 'lodash.isempty';
import { DataSource } from 'typeorm';
Expand Down Expand Up @@ -52,6 +51,8 @@ import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.
import { computeObjectTargetTable } from 'src/engine/utils/compute-object-target-table.util';
import { isQueryTimeoutError } from 'src/engine/utils/query-timeout.util';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import { isDefined } from 'src/utils/is-defined';

import {
PGGraphQLMutation,
Expand All @@ -75,7 +76,7 @@ export class WorkspaceQueryRunnerService {
private readonly queryResultGettersFactory: QueryResultGettersFactory,
@InjectMessageQueue(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
private readonly eventEmitter: EventEmitter2,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
private readonly workspaceQueryHookService: WorkspaceQueryHookService,
private readonly environmentService: EnvironmentService,
private readonly duplicateService: DuplicateService,
Expand Down Expand Up @@ -295,18 +296,21 @@ export class WorkspaceQueryRunnerService {
options,
);

parsedResults.forEach((record) => {
this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.created`, {
name: `${objectMetadataItem.nameSingular}.created`,
workspaceId: authContext.workspace.id,
userId: authContext.user?.id,
recordId: record.id,
objectMetadata: objectMetadataItem,
properties: {
after: record,
},
} satisfies ObjectRecordCreateEvent<any>);
});
this.workspaceEventEmitter.emit(
`${objectMetadataItem.nameSingular}.created`,
parsedResults.map(
(record) =>
({
userId: authContext.user?.id,
recordId: record.id,
objectMetadata: objectMetadataItem,
properties: {
after: record,
},
}) satisfies ObjectRecordCreateEvent<any>,
),
authContext.workspace.id,
);

return parsedResults;
}
Expand Down Expand Up @@ -431,18 +435,22 @@ export class WorkspaceQueryRunnerService {
options,
);

this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.updated`, {
name: `${objectMetadataItem.nameSingular}.updated`,
workspaceId: authContext.workspace.id,
userId: authContext.user?.id,
recordId: existingRecord.id,
objectMetadata: objectMetadataItem,
properties: {
updatedFields: Object.keys(args.data),
before: this.removeNestedProperties(existingRecord as Record),
after: this.removeNestedProperties(parsedResults?.[0]),
},
} satisfies ObjectRecordUpdateEvent<any>);
this.workspaceEventEmitter.emit(
`${objectMetadataItem.nameSingular}.updated`,
[
{
userId: authContext.user?.id,
recordId: existingRecord.id,
objectMetadata: objectMetadataItem,
properties: {
updatedFields: Object.keys(args.data),
before: this.removeNestedProperties(existingRecord as Record),
after: this.removeNestedProperties(parsedResults?.[0]),
},
} satisfies ObjectRecordUpdateEvent<any>,
],
authContext.workspace.id,
);

return parsedResults?.[0];
}
Expand Down Expand Up @@ -504,30 +512,36 @@ export class WorkspaceQueryRunnerService {
options,
);

parsedResults.forEach((record) => {
const existingRecord = mappedRecords.get(record.id);
const eventsToEmit: ObjectRecordUpdateEvent<any>[] = parsedResults
.map((record) => {
const existingRecord = mappedRecords.get(record.id);

if (!existingRecord) {
this.logger.warn(
`Record with id ${record.id} not found in the database`,
);
if (!existingRecord) {
this.logger.warn(
`Record with id ${record.id} not found in the database`,
);

return;
}
return;
}

this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.updated`, {
name: `${objectMetadataItem.nameSingular}.updated`,
workspaceId: authContext.workspace.id,
userId: authContext.user?.id,
recordId: existingRecord.id,
objectMetadata: objectMetadataItem,
properties: {
updatedFields: Object.keys(args.data),
before: this.removeNestedProperties(existingRecord as Record),
after: this.removeNestedProperties(record),
},
} satisfies ObjectRecordUpdateEvent<any>);
});
return {
userId: authContext.user?.id,
recordId: existingRecord.id,
objectMetadata: objectMetadataItem,
properties: {
updatedFields: Object.keys(args.data),
before: this.removeNestedProperties(existingRecord as Record),
after: this.removeNestedProperties(record),
},
};
})
.filter(isDefined);

this.workspaceEventEmitter.emit(
`${objectMetadataItem.nameSingular}.updated`,
eventsToEmit,
authContext.workspace.id,
);

return parsedResults;
}
Expand Down Expand Up @@ -580,18 +594,21 @@ export class WorkspaceQueryRunnerService {
options,
);

parsedResults.forEach((record) => {
this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.deleted`, {
name: `${objectMetadataItem.nameSingular}.deleted`,
workspaceId: authContext.workspace.id,
userId: authContext.user?.id,
recordId: record.id,
objectMetadata: objectMetadataItem,
properties: {
before: this.removeNestedProperties(record),
},
} satisfies ObjectRecordDeleteEvent<any>);
});
this.workspaceEventEmitter.emit(
`${objectMetadataItem.nameSingular}.deleted`,
parsedResults.map(
(record) =>
({
userId: authContext.user?.id,
recordId: record.id,
objectMetadata: objectMetadataItem,
properties: {
before: this.removeNestedProperties(record),
},
}) satisfies ObjectRecordDeleteEvent<any>,
),
authContext.workspace.id,
);

return parsedResults;
}
Expand Down Expand Up @@ -644,19 +661,23 @@ export class WorkspaceQueryRunnerService {
options,
);

this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.deleted`, {
name: `${objectMetadataItem.nameSingular}.deleted`,
workspaceId: authContext.workspace.id,
userId: authContext.user?.id,
recordId: args.id,
objectMetadata: objectMetadataItem,
properties: {
before: {
...(existingRecord ?? {}),
...this.removeNestedProperties(parsedResults?.[0]),
},
},
} satisfies ObjectRecordDeleteEvent<any>);
this.workspaceEventEmitter.emit(
`${objectMetadataItem.nameSingular}.deleted`,
[
{
userId: authContext.user?.id,
recordId: args.id,
objectMetadata: objectMetadataItem,
properties: {
before: {
...(existingRecord ?? {}),
...this.removeNestedProperties(parsedResults?.[0]),
},
},
} satisfies ObjectRecordDeleteEvent<any>,
],
authContext.workspace.id,
);

return parsedResults?.[0];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/t
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';

@Injectable()
Expand All @@ -23,7 +24,9 @@ export class BillingWorkspaceMemberListener {
@OnEvent('workspaceMember.created')
@OnEvent('workspaceMember.deleted')
async handleCreateOrDeleteEvent(
payload: ObjectRecordCreateEvent<WorkspaceMemberWorkspaceEntity>,
payload: WorkspaceEventBatch<
ObjectRecordCreateEvent<WorkspaceMemberWorkspaceEntity>
>,
) {
if (!this.environmentService.get('IS_BILLING_ENABLED')) {
return;
Expand Down
Loading
Loading