Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion backend/src/database/repositories/activityRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { findManyLfxMemberships } from '@crowd/data-access-layer/src/lfx_members
import { ActivityDisplayService } from '@crowd/integrations'
import { IIntegrationResult, IntegrationResultState } from '@crowd/types'

import { QUEUE_CLIENT } from '@/serverless/utils/queueService'

import { AttributeData } from '../attributes/attribute'
import SequelizeFilterUtils from '../utils/sequelizeFilterUtils'

Expand Down Expand Up @@ -58,7 +60,7 @@ class ActivityRepository {
data.platform = data.platform.toLowerCase()
}

const ids = await insertActivities([
const ids = await insertActivities(QUEUE_CLIENT(), [
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Passing QUEUE_CLIENT() to insertActivities.

Good shift toward a queue-based approach. Consider robust error-handling for partial insertions or queue outages, potentially logging failures or implementing rollback logic.

{
type: data.type,
timestamp: data.timestamp,
Expand Down
4 changes: 2 additions & 2 deletions backend/src/services/activityService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { IMemberIdentity, IntegrationResultType, PlatformType, SegmentData } fro

import { IRepositoryOptions } from '@/database/repositories/IRepositoryOptions'
import OrganizationRepository from '@/database/repositories/organizationRepository'
import { getDataSinkWorkerEmitter } from '@/serverless/utils/queueService'
import { QUEUE_CLIENT, getDataSinkWorkerEmitter } from '@/serverless/utils/queueService'

import { GITHUB_CONFIG, IS_DEV_ENV, IS_TEST_ENV } from '../conf'
import ActivityRepository from '../database/repositories/activityRepository'
Expand Down Expand Up @@ -174,7 +174,7 @@ export default class ActivityService extends LoggerBase {
)

record = await ActivityRepository.create(data, repositoryOptions)
await insertActivities([{ ...data, id: record.id }], true)
await insertActivities(QUEUE_CLIENT(), [{ ...data, id: record.id }], true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling or retry logic for queue insertion.
The call to “insertActivities(QUEUE_CLIENT(), …)” doesn’t appear to have explicit fallback logic if the queue is unreachable or insertion fails. Consider adding retry logic or error handling to ensure critical data isn’t lost when insertion fails.


// Only track activity's platform and timestamp and memberId. It is completely annonymous.
telemetryTrack(
Expand Down
23 changes: 13 additions & 10 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions scripts/scaffold.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,23 @@ services:
networks:
- crowd-bridge

kafka-connect:
build:
context: scaffold/kafka-connect
restart: unless-stopped
entrypoint:
- connect-standalone
- /etc/kafka-connect/worker-local.properties
- /etc/kafka-connect/console-local-sink.properties
- /etc/kafka-connect/questdb-local-sink.properties
volumes:
- kafka-connect-dev:/storage
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties
networks:
- crowd-bridge

Comment on lines +164 to +180
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add essential service configurations for Kafka Connect

The Kafka Connect service configuration needs improvements for production readiness:

  1. No health check defined
  2. No resource limits set
  3. No explicit dependency on Kafka
  4. Using standalone mode (not recommended for production)

Consider these improvements:

   kafka-connect:
     build:
       context: scaffold/kafka-connect
     restart: unless-stopped
+    depends_on:
+      kafka:
+        condition: service_started
+    healthcheck:
+      test: ["CMD-SHELL", "curl -f http://localhost:8083/connectors || exit 1"]
+      interval: 30s
+      timeout: 10s
+      retries: 3
+      start_period: 30s
+    deploy:
+      resources:
+        limits:
+          memory: 1G
+        reservations:
+          memory: 512M
     entrypoint:
       - connect-standalone

Also, consider switching to distributed mode for better scalability:

     entrypoint:
-      - connect-standalone
+      - connect-distributed
       - /etc/kafka-connect/worker-local.properties
-      - /etc/kafka-connect/console-local-sink.properties
-      - /etc/kafka-connect/questdb-local-sink.properties
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kafka-connect:
build:
context: scaffold/kafka-connect
restart: unless-stopped
entrypoint:
- connect-standalone
- /etc/kafka-connect/worker-local.properties
- /etc/kafka-connect/console-local-sink.properties
- /etc/kafka-connect/questdb-local-sink.properties
volumes:
- kafka-connect-dev:/storage
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties
networks:
- crowd-bridge
kafka-connect:
build:
context: scaffold/kafka-connect
restart: unless-stopped
depends_on:
kafka:
condition: service_started
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8083/connectors || exit 1"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
deploy:
resources:
limits:
memory: 1G
reservations:
memory: 512M
entrypoint:
- connect-distributed
- /etc/kafka-connect/worker-local.properties
volumes:
- kafka-connect-dev:/storage
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties
networks:
- crowd-bridge

temporal:
build:
context: scaffold/temporal
Expand All @@ -182,3 +199,4 @@ volumes:
opensearch-dev:
s3-dev:
redis-dev:
kafka-connect-dev:
13 changes: 13 additions & 0 deletions scripts/scaffold/kafka-connect/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM confluentinc/cp-kafka-connect:7.8.0-2-ubi8

USER root

RUN yum install -y jq findutils unzip

RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt
RUN confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt
Comment on lines +5 to +8
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize Dockerfile layers and security

  1. Combine RUN commands to reduce image layers and cleanup package manager cache
  2. Pin specific versions of system packages for reproducibility
  3. Verify connector versions for compatibility

Consider this optimization:

-RUN yum install -y jq findutils unzip
-
-RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt
-RUN confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt
+RUN yum install -y jq-1.6-* findutils-4.6.0-* unzip-6.0-* \
+    && yum clean all \
+    && rm -rf /var/cache/yum \
+    && confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt \
+    && confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt

Committable suggestion skipped: line range outside the PR's diff.


VOLUME /storage

USER appuser

21 changes: 21 additions & 0 deletions scripts/scaffold/kafka-connect/build-docker-image.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash

set -euo pipefail

TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)"
readonly TAG
Comment on lines +5 to +6
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Parameterize the registry path for better flexibility

The registry path sjc.ocir.io/axbydjxa5zuh is hardcoded, which could cause issues when deploying to different environments.

Consider using environment variables:

-TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)"
+REGISTRY_PATH="${REGISTRY_PATH:-sjc.ocir.io/axbydjxa5zuh}"
+TAG="${REGISTRY_PATH}/kafka-connect:$(date +%s)"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)"
readonly TAG
REGISTRY_PATH="${REGISTRY_PATH:-sjc.ocir.io/axbydjxa5zuh}"
TAG="${REGISTRY_PATH}/kafka-connect:$(date +%s)"
readonly TAG


docker build -t "${TAG}" .

echo "----------------------------------------"
echo "Image built with tag: ${TAG}"
echo "----------------------------------------"
echo -n "Type 'y' and press Enter to push the image to the registry. Ctrl+C to cancel: "
read -r PUSH
if [ "${PUSH}" = "y" ]; then
echo "Pushing image to the registry..."
echo "----------------------------------------"
docker push "${TAG}"
else
echo "Skipping push"
fi
6 changes: 6 additions & 0 deletions scripts/scaffold/kafka-connect/console-local-sink.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
name=console-sink
connector.class=FileStreamSinkConnector
tasks.max=1
topics=activities
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
12 changes: 12 additions & 0 deletions scripts/scaffold/kafka-connect/questdb-local-sink.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name=questdb-sink
client.conf.string=http::addr=questdb:9000;
topics=activities
table=activities
connector.class=io.questdb.kafka.QuestDBSinkConnector
Comment on lines +1 to +5
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling and monitoring configurations

The connector configuration lacks essential error handling and monitoring settings.

Add these configurations:

 name=questdb-sink
 client.conf.string=http::addr=questdb:9000;
 topics=activities
 table=activities
 connector.class=io.questdb.kafka.QuestDBSinkConnector
+errors.tolerance=all
+errors.deadletterqueue.topic.name=dlq-activities
+errors.deadletterqueue.topic.replication.factor=1
+errors.deadletterqueue.context.headers.enable=true
+errors.log.enable=true
+errors.log.include.messages=true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
name=questdb-sink
client.conf.string=http::addr=questdb:9000;
topics=activities
table=activities
connector.class=io.questdb.kafka.QuestDBSinkConnector
name=questdb-sink
client.conf.string=http::addr=questdb:9000;
topics=activities
table=activities
connector.class=io.questdb.kafka.QuestDBSinkConnector
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq-activities
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.context.headers.enable=true
errors.log.enable=true
errors.log.include.messages=true

value.converter=org.apache.kafka.connect.json.JsonConverter
include.key=false
key.converter=org.apache.kafka.connect.storage.StringConverter
timestamp.field.name=timestamp
timestamp.string.fields=createdAt,updatedAt
timestamp.string.format=yyyy-MM-ddTHH:mm:ss.SSSZ
value.converter.schemas.enable=false
13 changes: 13 additions & 0 deletions scripts/scaffold/kafka-connect/worker-local.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
bootstrap.servers=kafka:9092
group.id=kafka-connect
Comment on lines +1 to +2
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add security configurations for Kafka connection

The current configuration lacks security settings for Kafka connection.

Add these security configurations:

 bootstrap.servers=kafka:9092
 group.id=kafka-connect
+security.protocol=SSL
+ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
+ssl.truststore.password=${file:/etc/kafka/secrets/connect-secrets:truststore-password}
+ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
+ssl.keystore.password=${file:/etc/kafka/secrets/connect-secrets:keystore-password}
+ssl.key.password=${file:/etc/kafka/secrets/connect-secrets:key-password}

Committable suggestion skipped: line range outside the PR's diff.


key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/storage/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java,/usr/share/filestream-connectors,/usr/share/confluent-hub-components
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ export async function createConversations(): Promise<ICreateConversationsResult>
if (toUpdate.length > 0) {
for (const batch of partition(toUpdate, 100)) {
try {
const results = await insertActivities(batch, true)
const results = await insertActivities(svc.queue, batch, true)
activitiesAddedToConversations += results.length
} catch (err) {
svc.log.error(err, 'Error linking activities to conversations')
Expand All @@ -209,7 +209,7 @@ async function getRows(qdbConn: DbConnOrTx, current: Date): Promise<any[]> {
activities_to_check_for_parentId AS (
SELECT *
FROM activities child
WHERE deletedAt IS NULL
WHERE deletedAt IS NULL
AND sourceParentId IS NOT NULL
AND conversationId IS NULL
AND timestamp > dateadd('w', -1, $(limit))
Expand All @@ -236,7 +236,7 @@ async function getMinActivityTimestamp(qdbConn: DbConnOrTx): Promise<string | nu
const query = `
select first(timestamp) as minTimestamp
from activities
where
where
deletedAt is null and
sourceParentId is not null and
conversationId is null;
Expand Down
3 changes: 3 additions & 0 deletions services/apps/activities_worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const options: Options = {
opensearch: {
enabled: false,
},
queue: {
enabled: true,
},
}

export const svc = new ServiceWorker(config, options)
Expand Down
1 change: 1 addition & 0 deletions services/apps/data_sink_worker/src/bin/process-results.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ setImmediate(async () => {
dataSinkWorkerEmitter,
redis,
temporal,
queueClient,
log,
)

Expand Down
3 changes: 2 additions & 1 deletion services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import DataSinkService from '../service/dataSink.service'
export class WorkerQueueReceiver extends PrioritizedQueueReciever {
constructor(
level: QueuePriorityLevel,
client: IQueue,
private readonly client: IQueue,
private readonly pgConn: DbConnection,
private readonly qdbConn: DbConnection,
private readonly searchSyncWorkerEmitter: SearchSyncWorkerEmitter,
Expand Down Expand Up @@ -49,6 +49,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
this.dataSinkWorkerEmitter,
this.redisClient,
this.temporal,
this.client,
this.log,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import SettingsRepository from '@crowd/data-access-layer/src/old/apps/data_sink_
import { DEFAULT_ACTIVITY_TYPE_SETTINGS, GithubActivityType } from '@crowd/integrations'
import { GitActivityType } from '@crowd/integrations/src/integrations/git/types'
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
import { IQueue } from '@crowd/queue'
import { RedisClient } from '@crowd/redis'
import { Client as TemporalClient } from '@crowd/temporal'
import {
Expand All @@ -60,6 +61,7 @@ export default class ActivityService extends LoggerBase {
private readonly searchSyncWorkerEmitter: SearchSyncWorkerEmitter,
private readonly redisClient: RedisClient,
private readonly temporal: TemporalClient,
private readonly client: IQueue,
parentLog: Logger,
) {
super(parentLog)
Expand Down Expand Up @@ -102,7 +104,7 @@ export default class ActivityService extends LoggerBase {

this.log.debug('Creating an activity in QuestDB!')
try {
await insertActivities([
await insertActivities(this.client, [
{
id: activity.id,
timestamp: activity.timestamp.toISOString(),
Expand Down Expand Up @@ -185,7 +187,7 @@ export default class ActivityService extends LoggerBase {

// use insert instead of update to avoid using pg protocol with questdb
try {
await insertActivities([
await insertActivities(this.client, [
{
id,
memberId: toUpdate.memberId || original.memberId,
Expand Down Expand Up @@ -592,6 +594,7 @@ export default class ActivityService extends LoggerBase {
this.searchSyncWorkerEmitter,
this.redisClient,
this.temporal,
this.client,
this.log,
)
const txIntegrationRepo = new IntegrationRepository(txStore, this.log)
Expand Down Expand Up @@ -1303,6 +1306,7 @@ export default class ActivityService extends LoggerBase {
}) => {
await updateActivities(
this.qdbStore.connection(),
this.client,
async (activity) => ({
attributes: {
...gitAttributes,
Expand Down Expand Up @@ -1352,6 +1356,7 @@ export default class ActivityService extends LoggerBase {

await updateActivities(
this.qdbStore.connection(),
this.client,
async () => ({
sourceParentId: activity.sourceId,
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data'
import DataSinkRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo'
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
import { IQueue } from '@crowd/queue'
import { RedisClient } from '@crowd/redis'
import telemetry from '@crowd/telemetry'
import { Client as TemporalClient } from '@crowd/temporal'
Expand Down Expand Up @@ -36,6 +37,7 @@ export default class DataSinkService extends LoggerBase {
private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter,
private readonly redisClient: RedisClient,
private readonly temporal: TemporalClient,
private readonly client: IQueue,
parentLog: Logger,
) {
super(parentLog)
Expand Down Expand Up @@ -218,6 +220,7 @@ export default class DataSinkService extends LoggerBase {
this.searchSyncWorkerEmitter,
this.redisClient,
this.temporal,
this.client,
this.log,
)
const activityData = data.data as IActivityData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export async function moveActivitiesBetweenMembers(
if (!memberExists) {
return
}
await moveActivitiesToNewMember(svc.questdbSQL, primaryId, secondaryId, tenantId)
await moveActivitiesToNewMember(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the code analysis, I can now provide the final response:

Queue error handling needs to be improved in activity merging flow

The implementation in insertActivities uses QueueEmitter to send messages but doesn't have proper error handling for queue failures. While the queue client is properly typed as IQueue, the actual error handling for queue unavailability is missing. Specific issues:

  • No try-catch block around emitter.sendMessage() calls
  • No fallback mechanism if queue operations fail
  • No logging of queue-related errors
🔗 Analysis chain

Ensure proper error handling if the queue is unavailable.

You are now passing svc.queue to moveActivitiesToNewMember. Confirm that any internal logic gracefully handles a null or undefined queue. Consider adding logging or a fallback if queue usage fails.


Let me gather information about the error handling in the moveActivitiesToNewMember function and its queue usage.


Let me check the implementation details of the function to understand how it handles the queue parameter.


Let me check the implementation of the updateActivities function since it's the one actually using the queue parameter.


Let me check the implementation of insertActivities to see how it handles the queue.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find the implementation of moveActivitiesToNewMember function
ast-grep --pattern 'function moveActivitiesToNewMember($_) {
  $$$
}'

# Also search for any direct references to this function
rg "moveActivitiesToNewMember" -A 5

Length of output: 1977


Script:

#!/bin/bash
# Get the implementation of moveActivitiesToNewMember from the data-access-layer
cat services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts

Length of output: 4938


Script:

#!/bin/bash
# Get the implementation of updateActivities function
cat services/libs/data-access-layer/src/activities/update.ts

Length of output: 2103


Script:

#!/bin/bash
# Get the implementation of insertActivities function
cat services/libs/data-access-layer/src/activities/ilp.ts

Length of output: 2521

}

export async function moveActivitiesWithIdentityToAnotherMember(
Expand Down Expand Up @@ -67,6 +67,7 @@ export async function moveActivitiesWithIdentityToAnotherMember(
)) {
await moveIdentityActivitiesToNewMember(
svc.questdbSQL,
svc.queue,
tenantId,
fromId,
toId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export async function moveActivitiesBetweenOrgs(
secondaryId: string,
tenantId: string,
): Promise<void> {
await moveActivitiesToNewOrg(svc.questdbSQL, primaryId, secondaryId, tenantId)
await moveActivitiesToNewOrg(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId)
}

export async function recalculateActivityAffiliationsOfOrganizationSynchronous(
Expand Down
Loading
Loading