diff --git a/backend/src/database/repositories/activityRepository.ts b/backend/src/database/repositories/activityRepository.ts index c6d3192f85..c1b2c8f5be 100644 --- a/backend/src/database/repositories/activityRepository.ts +++ b/backend/src/database/repositories/activityRepository.ts @@ -14,6 +14,8 @@ import { findManyLfxMemberships } from '@crowd/data-access-layer/src/lfx_members import { ActivityDisplayService } from '@crowd/integrations' import { IIntegrationResult, IntegrationResultState } from '@crowd/types' +import { QUEUE_CLIENT } from '@/serverless/utils/queueService' + import { AttributeData } from '../attributes/attribute' import SequelizeFilterUtils from '../utils/sequelizeFilterUtils' @@ -58,7 +60,7 @@ class ActivityRepository { data.platform = data.platform.toLowerCase() } - const ids = await insertActivities([ + const ids = await insertActivities(QUEUE_CLIENT(), [ { type: data.type, timestamp: data.timestamp, diff --git a/backend/src/services/activityService.ts b/backend/src/services/activityService.ts index e92f1c12ce..89be5fb3d3 100644 --- a/backend/src/services/activityService.ts +++ b/backend/src/services/activityService.ts @@ -23,7 +23,7 @@ import { IMemberIdentity, IntegrationResultType, PlatformType, SegmentData } fro import { IRepositoryOptions } from '@/database/repositories/IRepositoryOptions' import OrganizationRepository from '@/database/repositories/organizationRepository' -import { getDataSinkWorkerEmitter } from '@/serverless/utils/queueService' +import { QUEUE_CLIENT, getDataSinkWorkerEmitter } from '@/serverless/utils/queueService' import { GITHUB_CONFIG, IS_DEV_ENV, IS_TEST_ENV } from '../conf' import ActivityRepository from '../database/repositories/activityRepository' @@ -174,7 +174,7 @@ export default class ActivityService extends LoggerBase { ) record = await ActivityRepository.create(data, repositoryOptions) - await insertActivities([{ ...data, id: record.id }], true) + await insertActivities(QUEUE_CLIENT(), [{ ...data, id: record.id }], true) // Only track activity's platform and timestamp and memberId. It is completely annonymous. telemetryTrack( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 90de0b5ed9..78632c4162 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1510,6 +1510,9 @@ importers: '@crowd/questdb': specifier: workspace:* version: link:../questdb + '@crowd/queue': + specifier: workspace:* + version: link:../queue '@crowd/redis': specifier: workspace:* version: link:../redis @@ -8941,8 +8944,8 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sso-oidc': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -9185,11 +9188,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sso-oidc@3.572.0(@aws-sdk/client-sts@3.572.0)': + '@aws-sdk/client-sso-oidc@3.572.0': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -9228,7 +9231,6 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: - - '@aws-sdk/client-sts' - aws-crt '@aws-sdk/client-sso-oidc@3.687.0(@aws-sdk/client-sts@3.687.0)': @@ -9449,11 +9451,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sts@3.572.0': + '@aws-sdk/client-sts@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -9492,6 +9494,7 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: + - '@aws-sdk/client-sso-oidc' - aws-crt '@aws-sdk/client-sts@3.687.0': @@ -9651,7 +9654,7 @@ snapshots: '@aws-sdk/credential-provider-ini@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/credential-provider-env': 3.568.0 '@aws-sdk/credential-provider-process': 3.572.0 '@aws-sdk/credential-provider-sso': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) @@ -9818,7 +9821,7 @@ snapshots: '@aws-sdk/credential-provider-web-identity@3.568.0(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/types': 2.12.0 @@ -10087,7 +10090,7 @@ snapshots: '@aws-sdk/token-providers@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/shared-ini-file-loader': 2.4.0 diff --git a/scripts/scaffold.yaml b/scripts/scaffold.yaml index d2633251da..70efd14a64 100644 --- a/scripts/scaffold.yaml +++ b/scripts/scaffold.yaml @@ -161,6 +161,23 @@ services: networks: - crowd-bridge + kafka-connect: + build: + context: scaffold/kafka-connect + restart: unless-stopped + entrypoint: + - connect-standalone + - /etc/kafka-connect/worker-local.properties + - /etc/kafka-connect/console-local-sink.properties + - /etc/kafka-connect/questdb-local-sink.properties + volumes: + - kafka-connect-dev:/storage + - ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties + - ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties + - ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties + networks: + - crowd-bridge + temporal: build: context: scaffold/temporal @@ -182,3 +199,4 @@ volumes: opensearch-dev: s3-dev: redis-dev: + kafka-connect-dev: diff --git a/scripts/scaffold/kafka-connect/Dockerfile b/scripts/scaffold/kafka-connect/Dockerfile new file mode 100644 index 0000000000..8a9b6f1815 --- /dev/null +++ b/scripts/scaffold/kafka-connect/Dockerfile @@ -0,0 +1,13 @@ +FROM confluentinc/cp-kafka-connect:7.8.0-2-ubi8 + +USER root + +RUN yum install -y jq findutils unzip + +RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt +RUN confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt + +VOLUME /storage + +USER appuser + diff --git a/scripts/scaffold/kafka-connect/build-docker-image.sh b/scripts/scaffold/kafka-connect/build-docker-image.sh new file mode 100755 index 0000000000..c2214ba3cd --- /dev/null +++ b/scripts/scaffold/kafka-connect/build-docker-image.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +set -euo pipefail + +TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)" +readonly TAG + +docker build -t "${TAG}" . + +echo "----------------------------------------" +echo "Image built with tag: ${TAG}" +echo "----------------------------------------" +echo -n "Type 'y' and press Enter to push the image to the registry. Ctrl+C to cancel: " +read -r PUSH +if [ "${PUSH}" = "y" ]; then + echo "Pushing image to the registry..." + echo "----------------------------------------" + docker push "${TAG}" +else + echo "Skipping push" +fi diff --git a/scripts/scaffold/kafka-connect/console-local-sink.properties b/scripts/scaffold/kafka-connect/console-local-sink.properties new file mode 100644 index 0000000000..1db005773b --- /dev/null +++ b/scripts/scaffold/kafka-connect/console-local-sink.properties @@ -0,0 +1,6 @@ +name=console-sink +connector.class=FileStreamSinkConnector +tasks.max=1 +topics=activities +value.converter=org.apache.kafka.connect.json.JsonConverter +value.converter.schemas.enable=false diff --git a/scripts/scaffold/kafka-connect/questdb-local-sink.properties b/scripts/scaffold/kafka-connect/questdb-local-sink.properties new file mode 100644 index 0000000000..bbcea5f70e --- /dev/null +++ b/scripts/scaffold/kafka-connect/questdb-local-sink.properties @@ -0,0 +1,12 @@ +name=questdb-sink +client.conf.string=http::addr=questdb:9000; +topics=activities +table=activities +connector.class=io.questdb.kafka.QuestDBSinkConnector +value.converter=org.apache.kafka.connect.json.JsonConverter +include.key=false +key.converter=org.apache.kafka.connect.storage.StringConverter +timestamp.field.name=timestamp +timestamp.string.fields=createdAt,updatedAt +timestamp.string.format=yyyy-MM-ddTHH:mm:ss.SSSZ +value.converter.schemas.enable=false diff --git a/scripts/scaffold/kafka-connect/worker-local.properties b/scripts/scaffold/kafka-connect/worker-local.properties new file mode 100644 index 0000000000..6d34eb1a09 --- /dev/null +++ b/scripts/scaffold/kafka-connect/worker-local.properties @@ -0,0 +1,13 @@ +bootstrap.servers=kafka:9092 +group.id=kafka-connect + +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.json.JsonConverter +internal.key.converter=org.apache.kafka.connect.storage.StringConverter +internal.value.converter=org.apache.kafka.connect.json.JsonConverter +key.converter.schemas.enable=true +value.converter.schemas.enable=true + +offset.storage.file.filename=/storage/connect.offsets +offset.flush.interval.ms=10000 +plugin.path=/usr/share/java,/usr/share/filestream-connectors,/usr/share/confluent-hub-components diff --git a/services/apps/activities_worker/src/activities/createConversations.ts b/services/apps/activities_worker/src/activities/createConversations.ts index d71120c80d..b58d311a2c 100644 --- a/services/apps/activities_worker/src/activities/createConversations.ts +++ b/services/apps/activities_worker/src/activities/createConversations.ts @@ -182,7 +182,7 @@ export async function createConversations(): Promise if (toUpdate.length > 0) { for (const batch of partition(toUpdate, 100)) { try { - const results = await insertActivities(batch, true) + const results = await insertActivities(svc.queue, batch, true) activitiesAddedToConversations += results.length } catch (err) { svc.log.error(err, 'Error linking activities to conversations') @@ -209,7 +209,7 @@ async function getRows(qdbConn: DbConnOrTx, current: Date): Promise { activities_to_check_for_parentId AS ( SELECT * FROM activities child - WHERE deletedAt IS NULL + WHERE deletedAt IS NULL AND sourceParentId IS NOT NULL AND conversationId IS NULL AND timestamp > dateadd('w', -1, $(limit)) @@ -236,7 +236,7 @@ async function getMinActivityTimestamp(qdbConn: DbConnOrTx): Promise { dataSinkWorkerEmitter, redis, temporal, + queueClient, log, ) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index e0f394b1a0..6be2e81c94 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -17,7 +17,7 @@ import DataSinkService from '../service/dataSink.service' export class WorkerQueueReceiver extends PrioritizedQueueReciever { constructor( level: QueuePriorityLevel, - client: IQueue, + private readonly client: IQueue, private readonly pgConn: DbConnection, private readonly qdbConn: DbConnection, private readonly searchSyncWorkerEmitter: SearchSyncWorkerEmitter, @@ -49,6 +49,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { this.dataSinkWorkerEmitter, this.redisClient, this.temporal, + this.client, this.log, ) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 7f39aeff65..3c567f05b8 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -34,6 +34,7 @@ import SettingsRepository from '@crowd/data-access-layer/src/old/apps/data_sink_ import { DEFAULT_ACTIVITY_TYPE_SETTINGS, GithubActivityType } from '@crowd/integrations' import { GitActivityType } from '@crowd/integrations/src/integrations/git/types' import { Logger, LoggerBase, getChildLogger } from '@crowd/logging' +import { IQueue } from '@crowd/queue' import { RedisClient } from '@crowd/redis' import { Client as TemporalClient } from '@crowd/temporal' import { @@ -60,6 +61,7 @@ export default class ActivityService extends LoggerBase { private readonly searchSyncWorkerEmitter: SearchSyncWorkerEmitter, private readonly redisClient: RedisClient, private readonly temporal: TemporalClient, + private readonly client: IQueue, parentLog: Logger, ) { super(parentLog) @@ -102,7 +104,7 @@ export default class ActivityService extends LoggerBase { this.log.debug('Creating an activity in QuestDB!') try { - await insertActivities([ + await insertActivities(this.client, [ { id: activity.id, timestamp: activity.timestamp.toISOString(), @@ -185,7 +187,7 @@ export default class ActivityService extends LoggerBase { // use insert instead of update to avoid using pg protocol with questdb try { - await insertActivities([ + await insertActivities(this.client, [ { id, memberId: toUpdate.memberId || original.memberId, @@ -592,6 +594,7 @@ export default class ActivityService extends LoggerBase { this.searchSyncWorkerEmitter, this.redisClient, this.temporal, + this.client, this.log, ) const txIntegrationRepo = new IntegrationRepository(txStore, this.log) @@ -1303,6 +1306,7 @@ export default class ActivityService extends LoggerBase { }) => { await updateActivities( this.qdbStore.connection(), + this.client, async (activity) => ({ attributes: { ...gitAttributes, @@ -1352,6 +1356,7 @@ export default class ActivityService extends LoggerBase { await updateActivities( this.qdbStore.connection(), + this.client, async () => ({ sourceParentId: activity.sourceId, }), diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index fc1b175849..2406c25fa7 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -7,6 +7,7 @@ import { } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data' import DataSinkRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo' import { Logger, LoggerBase, getChildLogger } from '@crowd/logging' +import { IQueue } from '@crowd/queue' import { RedisClient } from '@crowd/redis' import telemetry from '@crowd/telemetry' import { Client as TemporalClient } from '@crowd/temporal' @@ -36,6 +37,7 @@ export default class DataSinkService extends LoggerBase { private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter, private readonly redisClient: RedisClient, private readonly temporal: TemporalClient, + private readonly client: IQueue, parentLog: Logger, ) { super(parentLog) @@ -218,6 +220,7 @@ export default class DataSinkService extends LoggerBase { this.searchSyncWorkerEmitter, this.redisClient, this.temporal, + this.client, this.log, ) const activityData = data.data as IActivityData diff --git a/services/apps/entity_merging_worker/src/activities/members.ts b/services/apps/entity_merging_worker/src/activities/members.ts index 15086731ea..3b133ea565 100644 --- a/services/apps/entity_merging_worker/src/activities/members.ts +++ b/services/apps/entity_merging_worker/src/activities/members.ts @@ -38,7 +38,7 @@ export async function moveActivitiesBetweenMembers( if (!memberExists) { return } - await moveActivitiesToNewMember(svc.questdbSQL, primaryId, secondaryId, tenantId) + await moveActivitiesToNewMember(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId) } export async function moveActivitiesWithIdentityToAnotherMember( @@ -67,6 +67,7 @@ export async function moveActivitiesWithIdentityToAnotherMember( )) { await moveIdentityActivitiesToNewMember( svc.questdbSQL, + svc.queue, tenantId, fromId, toId, diff --git a/services/apps/entity_merging_worker/src/activities/organizations.ts b/services/apps/entity_merging_worker/src/activities/organizations.ts index 066c631ba7..1cc7aaea57 100644 --- a/services/apps/entity_merging_worker/src/activities/organizations.ts +++ b/services/apps/entity_merging_worker/src/activities/organizations.ts @@ -29,7 +29,7 @@ export async function moveActivitiesBetweenOrgs( secondaryId: string, tenantId: string, ): Promise { - await moveActivitiesToNewOrg(svc.questdbSQL, primaryId, secondaryId, tenantId) + await moveActivitiesToNewOrg(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId) } export async function recalculateActivityAffiliationsOfOrganizationSynchronous( diff --git a/services/apps/entity_merging_worker/src/main.ts b/services/apps/entity_merging_worker/src/main.ts index ae4b77cb6f..3a8788b7cc 100644 --- a/services/apps/entity_merging_worker/src/main.ts +++ b/services/apps/entity_merging_worker/src/main.ts @@ -26,6 +26,9 @@ const options: Options = { opensearch: { enabled: false, }, + queue: { + enabled: true, + }, } export const svc = new ServiceWorker(config, options) diff --git a/services/apps/profiles_worker/src/activities/member/memberUpdate.ts b/services/apps/profiles_worker/src/activities/member/memberUpdate.ts index 7df365fa9d..b6a045f7b7 100644 --- a/services/apps/profiles_worker/src/activities/member/memberUpdate.ts +++ b/services/apps/profiles_worker/src/activities/member/memberUpdate.ts @@ -10,7 +10,12 @@ a given member. */ export async function updateMemberAffiliations(input: MemberUpdateInput): Promise { try { - await runMemberAffiliationsUpdate(svc.postgres.writer, svc.questdbSQL, input.member.id) + await runMemberAffiliationsUpdate( + svc.postgres.writer, + svc.questdbSQL, + svc.queue, + input.member.id, + ) } catch (err) { throw new Error(err) } diff --git a/services/apps/profiles_worker/src/main.ts b/services/apps/profiles_worker/src/main.ts index 0495f9cc62..547fe08e40 100644 --- a/services/apps/profiles_worker/src/main.ts +++ b/services/apps/profiles_worker/src/main.ts @@ -25,6 +25,9 @@ const options: Options = { opensearch: { enabled: false, }, + queue: { + enabled: true, + }, } export const svc = new ServiceWorker(config, options) diff --git a/services/libs/data-access-layer/package.json b/services/libs/data-access-layer/package.json index fe12e57c32..819be3150d 100644 --- a/services/libs/data-access-layer/package.json +++ b/services/libs/data-access-layer/package.json @@ -14,6 +14,7 @@ "@crowd/logging": "workspace:*", "@crowd/questdb": "workspace:*", "@crowd/redis": "workspace:*", + "@crowd/queue": "workspace:*", "@crowd/telemetry": "workspace:*", "@crowd/types": "workspace:*", "@questdb/nodejs-client": "~3.0.0", diff --git a/services/libs/data-access-layer/src/activities/ilp.ts b/services/libs/data-access-layer/src/activities/ilp.ts index d4980a546e..de4705a7f2 100644 --- a/services/libs/data-access-layer/src/activities/ilp.ts +++ b/services/libs/data-access-layer/src/activities/ilp.ts @@ -1,207 +1,59 @@ -import { Sender } from '@questdb/nodejs-client' +import { pick } from 'lodash' +import moment from 'moment' import { generateUUIDv4 } from '@crowd/common' -import { getClientILP } from '@crowd/questdb' +import { getServiceChildLogger } from '@crowd/logging' +import { ACTIVITIES_QUEUE_SETTINGS, IQueue, QueueEmitter } from '@crowd/queue' import telemetry from '@crowd/telemetry' import { IDbActivityCreateData } from '../old/apps/data_sink_worker/repo/activity.data' -const ilp: Sender = getClientILP() +import { ACTIVITY_ALL_COLUMNS } from './sql' + +const logger = getServiceChildLogger('insert-activities') export async function insertActivities( + queueClient: IQueue, activities: IDbActivityCreateData[], update = false, ): Promise { - const ids: string[] = [] - const now = Date.now() + const now = moment().toISOString() - if (activities.length > 0) { - for (const activity of activities) { + const toInsert = activities + .map((activity) => { const id = activity.id || generateUUIDv4() - ids.push(id) - - let createdAt - if (activity.createdAt) { - const res = new Date(activity.createdAt) - // log.info({ createdAt: res }, 'insertActivities.createdAt') - createdAt = res.getTime() - } else { - createdAt = now - } - - let updatedAt - if (update || !activity.updatedAt) { - updatedAt = now - } else { - const res = new Date(activity.updatedAt) - updatedAt = res.getTime() - } - - const row = ilp - .table('activities') - .symbol('tenantId', activity.tenantId) - .symbol('segmentId', activity.segmentId) - .symbol('platform', activity.platform) - .stringColumn('id', id) - .timestampColumn('createdAt', createdAt, 'ms') - .timestampColumn('updatedAt', updatedAt, 'ms') - .stringColumn('attributes', objectToBytes(tryToUnwrapAttributes(activity.attributes))) - .booleanColumn('member_isTeamMember', activity.isTeamMemberActivity || false) - .booleanColumn('member_isBot', activity.isBotActivity || false) - - if ( - activity.platform === 'git' || - activity.platform === 'github' || - activity.platform === 'gitlab' - ) { - if (typeof activity.attributes['isMainBranch'] === 'boolean') { - row.booleanColumn('gitIsMainBranch', activity.attributes['isMainBranch'] as boolean) - } - - if (typeof activity.attributes['isIndirectFork'] === 'boolean') { - row.booleanColumn('gitIsIndirectFork', activity.attributes['isIndirectFork'] as boolean) - } - - if (typeof activity.attributes['insertions'] === 'number') { - row.intColumn('gitInsertions', activity.attributes['insertions'] as number) - } else if (typeof activity.attributes['additions'] === 'number') { - row.intColumn('gitInsertions', activity.attributes['additions'] as number) - } - - if (typeof activity.attributes['deletions'] === 'number') { - row.intColumn('gitDeletions', activity.attributes['deletions'] as number) - } - - if (typeof activity.attributes['lines'] === 'number') { - row.intColumn('gitLines', activity.attributes['lines'] as number) - } - - if (typeof activity.attributes['isMerge'] === 'boolean') { - row.booleanColumn('gitIsMerge', activity.attributes['isMerge'] as boolean) - } - } - - if (activity.type) { - row.stringColumn('type', activity.type) - } - - if (typeof activity.isContribution === 'boolean') { - row.booleanColumn('isContribution', activity.isContribution) - } - - if (activity.sourceId) { - row.stringColumn('sourceId', activity.sourceId) - } - - if (activity.username) { - row.stringColumn('username', activity.username) - } - - if (typeof activity.score === 'number') { - row.intColumn('score', activity.score) - } - - if (activity.sourceParentId) { - row.stringColumn('sourceParentId', activity.sourceParentId) - } - - if (activity.organizationId) { - row.stringColumn('organizationId', activity.organizationId) - } - - if (activity.conversationId) { - row.stringColumn('conversationId', activity.conversationId) - } - - if (activity.channel) { - row.stringColumn('channel', activity.channel) - } - - if (activity.importHash) { - row.stringColumn('importHash', activity.importHash) - } - - if (activity.body) { - row.stringColumn('body', activity.body.slice(0, 2000)) - } - if (activity.title) { - row.stringColumn('title', activity.title) - } - - if (activity.url) { - row.stringColumn('url', activity.url) - } - - if (activity.parentId) { - row.stringColumn('parentId', activity.parentId) - } - - if (activity.memberId) { - row.stringColumn('memberId', activity.memberId) - } - - if (activity.username) { - row.stringColumn('username', activity.username) - } - - if (activity.objectMemberId) { - row.stringColumn('objectMemberId', activity.objectMemberId) - } - - if (activity.objectMemberUsername) { - row.stringColumn('objectMemberUsername', activity.objectMemberUsername) - } - - if (activity.sentiment) { - if (activity.sentiment.label) { - row.stringColumn('sentimentLabel', activity.sentiment.label) - } - - if (typeof activity.sentiment.sentiment === 'number') { - row.intColumn('sentimentScore', activity.sentiment.sentiment) - } - - if (typeof activity.sentiment.negative === 'number') { - row.floatColumn('sentimentScoreNegative', activity.sentiment.negative) - } - - if (typeof activity.sentiment.mixed === 'number') { - row.floatColumn('sentimentScoreMixed', activity.sentiment.mixed) - } - - if (typeof activity.sentiment.positive === 'number') { - row.floatColumn('sentimentScorePositive', activity.sentiment.positive) - } - - if (typeof activity.sentiment.neutral === 'number') { - row.floatColumn('sentimentScoreNeutral', activity.sentiment.neutral) - } - } - - if (activity.createdById) { - row.stringColumn('createdById', activity.createdById) - } - - if (activity.updatedById) { - row.stringColumn('updatedById', activity.updatedById) - } - - let timestamp - if (activity.timestamp) { - const res = new Date(activity.timestamp) - // log.info({ timestamp: res }, 'insertActivities.timestamp') - timestamp = res.getTime() - } else { - timestamp = now - } - - await row.at(timestamp, 'ms') - telemetry.increment('questdb.insert_activity', 1) - } + return { + // we keep these ones in front of `...activity` because these fields might exist in the activity object + member_isBot: activity.isBotActivity || false, + member_isTeamMember: activity.isTeamMemberActivity || false, + gitIsMainBranch: activity.attributes['isMainBranch'], + gitIsIndirectFork: activity.attributes['isIndirectFork'], + gitInsertions: activity.attributes['insertions'] || activity.attributes['additions'], + gitDeletions: activity.attributes['deletions'], + gitLines: activity.attributes['lines'], + gitIsMerge: activity.attributes['isMerge'], + + ...activity, + + id, + updatedAt: update || !activity.updatedAt ? now : moment(activity.updatedAt).toISOString(), + createdAt: activity.createdAt ? moment(activity.createdAt).toISOString() : now, + timestamp: activity.timestamp ? moment(activity.timestamp).toISOString() : now, + attributes: objectToBytes(tryToUnwrapAttributes(activity.attributes)), + body: activity.body?.slice(0, 2000), + } + }) + .map((activity) => pick(activity, ACTIVITY_ALL_COLUMNS)) // otherwise QuestDB insert fails + + const emitter = new QueueEmitter(queueClient, ACTIVITIES_QUEUE_SETTINGS, logger) + + for (const row of toInsert) { + await emitter.sendMessage(generateUUIDv4(), row, generateUUIDv4()) } + telemetry.increment('questdb.insert_activity', activities.length) - return ids + return toInsert.map((activity) => activity.id) } function objectToBytes(input: object): string { diff --git a/services/libs/data-access-layer/src/activities/sql.ts b/services/libs/data-access-layer/src/activities/sql.ts index c7ae39e4a0..2e9d7bd2d5 100644 --- a/services/libs/data-access-layer/src/activities/sql.ts +++ b/services/libs/data-access-layer/src/activities/sql.ts @@ -96,6 +96,51 @@ const ACTIVITY_UPDATABLE_COLUMNS: ActivityColumn[] = [ 'gitLines', 'gitIsMerge', ] + +export const ACTIVITY_ALL_COLUMNS: ActivityColumn[] = [ + 'id', + 'type', + 'timestamp', + 'platform', + 'isContribution', + 'score', + 'sourceId', + 'createdAt', + 'updatedAt', + 'deletedAt', + 'memberId', + 'parentId', + 'tenantId', + 'createdById', + 'updatedById', + 'sourceParentId', + 'conversationId', + 'attributes', + 'title', + 'body', + 'channel', + 'url', + 'username', + 'objectMemberId', + 'objectMemberUsername', + 'segmentId', + 'organizationId', + 'sentimentLabel', + 'sentimentScore', + 'sentimentScoreMixed', + 'sentimentScoreNeutral', + 'sentimentScoreNegative', + 'sentimentScorePositive', + 'member_isBot', + 'member_isTeamMember', + 'gitIsMainBranch', + 'gitIsIndirectFork', + 'gitLines', + 'gitInsertions', + 'gitDeletions', + 'gitIsMerge', +] + export async function updateActivity( conn: DbConnOrTx, id: string, diff --git a/services/libs/data-access-layer/src/activities/update.ts b/services/libs/data-access-layer/src/activities/update.ts index 0a16410196..f456a3cb03 100644 --- a/services/libs/data-access-layer/src/activities/update.ts +++ b/services/libs/data-access-layer/src/activities/update.ts @@ -2,6 +2,7 @@ import QueryStream from 'pg-query-stream' import { DbConnOrTx } from '@crowd/database' import { getServiceChildLogger, timer } from '@crowd/logging' +import { IQueue } from '@crowd/queue' import { IDbActivityCreateData } from '../old/apps/data_sink_worker/repo/activity.data' import { formatQuery } from '../queryExecutor' @@ -20,20 +21,41 @@ export async function streamActivities( const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`) const t = timer(logger, `query activities with ${whereClause}`) - const res = await qdb.stream(qs, async (stream) => { - for await (const item of stream) { - t.end() - const activity = item as unknown as IDbActivityCreateData + return new Promise((resolve, reject) => { + let processedAllRows = false + let streamResult = null - await onActivity(activity) + function tryFinish() { + if (processedAllRows && streamResult) { + resolve(streamResult) + } } + + qdb + .stream(qs, async (stream) => { + for await (const item of stream) { + t.end() + + const activity = item as unknown as IDbActivityCreateData + + await onActivity(activity) + } + + processedAllRows = true + tryFinish() + }) + .then((res) => { + streamResult = res + tryFinish() + }) + .catch(reject) }) - return res } export async function updateActivities( qdb: DbConnOrTx, + queueClient: IQueue, mapActivity: (activity: IDbActivityCreateData) => Promise>, where: string, params?: Record, @@ -42,6 +64,7 @@ export async function updateActivities( qdb, async (activity) => { await insertActivities( + queueClient, [ { ...activity, diff --git a/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts b/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts index ad5fdbccaf..b11083f73d 100644 --- a/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts @@ -1,4 +1,5 @@ import { DbConnOrTx, DbStore } from '@crowd/database' +import { IQueue } from '@crowd/queue' import { IActivityIdentity, IMemberIdentity, MergeActionState, MergeActionStep } from '@crowd/types' import { updateActivities } from '../../../activities/update' @@ -41,12 +42,14 @@ export async function findMemberById(db: DbStore, primaryId: string, tenantId: s export async function moveActivitiesToNewMember( qdb: DbConnOrTx, + queueClient: IQueue, primaryId: string, secondaryId: string, tenantId: string, ) { await updateActivities( qdb, + queueClient, async () => ({ memberId: primaryId }), `"memberId" = $(memberId) AND "tenantId" = $(tenantId)`, { @@ -119,6 +122,7 @@ export async function getIdentitiesWithActivity( export async function moveIdentityActivitiesToNewMember( db: DbConnOrTx, + queueClient: IQueue, tenantId: string, fromId: string, toId: string, @@ -127,6 +131,7 @@ export async function moveIdentityActivitiesToNewMember( ) { await updateActivities( db, + queueClient, async (activity: IDbActivityCreateData) => ({ ...activity, memberId: toId }), formatQuery( ` diff --git a/services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts b/services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts index 89373114f6..8557acc83a 100644 --- a/services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts +++ b/services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts @@ -1,4 +1,5 @@ import { DbConnOrTx, DbStore } from '@crowd/database' +import { IQueue } from '@crowd/queue' import { updateActivities } from '../../../activities/update' @@ -33,12 +34,14 @@ export async function deleteOrganizationById(db: DbStore, organizationId: string export async function moveActivitiesToNewOrg( qdb: DbConnOrTx, + queueClient: IQueue, primaryId: string, secondaryId: string, tenantId: string, ) { await updateActivities( qdb, + queueClient, async () => ({ organizationId: primaryId }), ` "organizationId" = $(organizationId) AND "tenantId" = $(tenantId) diff --git a/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts b/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts index 4adae9fdf1..8b93d10f6e 100644 --- a/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts @@ -4,6 +4,7 @@ import QueryStream from 'pg-query-stream' import { getDefaultTenantId } from '@crowd/common' import { DbConnOrTx, DbStore } from '@crowd/database' import { getServiceChildLogger } from '@crowd/logging' +import { IQueue } from '@crowd/queue' import { insertActivities } from '../../../activities' import { findMemberAffiliations } from '../../../member_segment_affiliations' @@ -18,6 +19,7 @@ const tenantId = getDefaultTenantId() export async function runMemberAffiliationsUpdate( pgDb: DbStore, qDb: DbConnOrTx, + queueClient: IQueue, memberId: string, ) { const qx = pgpQx(pgDb.connection()) @@ -155,7 +157,7 @@ export async function runMemberAffiliationsUpdate( } } - await insertActivities([activity], true) + await insertActivities(queueClient, [activity], true) } const qs = new QueryStream( diff --git a/services/libs/queue/src/types.ts b/services/libs/queue/src/types.ts index bd2842417e..ac2976cfdd 100644 --- a/services/libs/queue/src/types.ts +++ b/services/libs/queue/src/types.ts @@ -76,6 +76,7 @@ export enum CrowdQueue { INTEGRATION_STREAM_WORKER = 'integration-stream-worker', DATA_SINK_WORKER = 'data-sink-worker', SEARCH_SYNC_WORKER = 'search-sync-worker', + ACTIVITIES = 'activities', } export enum QueueVendor { diff --git a/services/libs/queue/src/vendors/kafka/config.ts b/services/libs/queue/src/vendors/kafka/config.ts index 5b0e2b2f14..cfd964c994 100644 --- a/services/libs/queue/src/vendors/kafka/config.ts +++ b/services/libs/queue/src/vendors/kafka/config.ts @@ -45,6 +45,14 @@ export const INTEGRATION_STREAM_WORKER_QUEUE_SETTINGS: IKafkaChannelConfig = { }, } +export const ACTIVITIES_QUEUE_SETTINGS: IKafkaChannelConfig = { + name: CrowdQueue.ACTIVITIES, + replicationFactor: 1, + partitions: { + default: 1, + }, +} + export const configMap = { [CrowdQueue.INTEGRATION_RUN_WORKER]: INTEGRATION_RUN_WORKER_QUEUE_SETTINGS, [CrowdQueue.INTEGRATION_STREAM_WORKER]: INTEGRATION_STREAM_WORKER_QUEUE_SETTINGS,