Skip to content

Commit

Permalink
cache worker first version
Browse files Browse the repository at this point in the history
  • Loading branch information
epipav committed Jan 12, 2024
1 parent 925fbd1 commit 9944b22
Show file tree
Hide file tree
Showing 34 changed files with 1,297 additions and 97 deletions.
26 changes: 24 additions & 2 deletions backend/src/bin/scripts/get-dashboard-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,29 @@ if (parameters.help || !parameters.tenantId || !parameters.segmentId) {

const end = moment().endOf('day')

const data = await CubeJsRepository.getActiveMembers(cubejsService, start, end, null, {}, false)
console.log('START DATE: ')
console.log(start.toISOString())
console.log('END DATE: ')
console.log(end.toISOString())

const previousStartDate = moment().subtract(13, 'days').startOf('day')
const previousEndDate = moment().subtract(7, 'days').endOf('day')

console.log("PREVIOUS START DATE:")
console.log(previousStartDate.toISOString())
console.log("PREVIOUS END DATE: ")
console.log(previousEndDate.toISOString())

const data = await CubeJsRepository.getNewActivities(
cubejsService,
start,
end,
'day',
[],
{},
{ [CubeDimension.ACTIVITY_DATE]: CubeOrderDirection.ASC },
true,
)

console.log(data)

Expand All @@ -81,7 +103,7 @@ if (parameters.help || !parameters.tenantId || !parameters.segmentId) {
start,
end,
null,
[CubeDimension.ACTIVITY_SENTIMENT_MOOD],
[CubeDimension.ACTIVITY_TYPE, CubeDimension.ACTIVITY_PLATFORM],
{
platform: 'github',
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ SELECT
END::VARCHAR(8) AS "sentimentMood",
a."organizationId",
a."segmentId",
a."conversationId"
a."conversationId",
a."createdAt"
FROM activities a
WHERE a."deletedAt" IS NULL
;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table "segments" drop column "dashboardCacheLastRefreshedAt";
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table "segments"
add column "dashboardCacheLastRefreshedAt" timestamp with time zone null;
63 changes: 63 additions & 0 deletions scripts/services/cache-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
version: '3.1'

x-env-args: &env-args
DOCKER_BUILDKIT: 1
NODE_ENV: docker
SERVICE: cache-worker
CROWD_TEMPORAL_TASKQUEUE: cache
SHELL: /bin/sh

services:
cache-worker:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.cache_worker
command: 'pnpm run start'
working_dir: /usr/crowd/app/services/apps/cache_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
restart: always
networks:
- crowd-bridge

cache-worker-dev:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.cache_worker
command: 'pnpm run dev'
working_dir: /usr/crowd/app/services/apps/cache_worker
# user: '${USER_ID}:${GROUP_ID}'
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
hostname: cache-worker
networks:
- crowd-bridge
volumes:
- ../../services/libs/alerting/src:/usr/crowd/app/services/libs/alerting/src
- ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src
- ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src
- ../../services/libs/conversations/src:/usr/crowd/app/services/libs/conversations/src
- ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src
- ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src
- ../../services/libs/ioc/src:/usr/crowd/app/services/libs/ioc/src
- ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src
- ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src
- ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src
- ../../services/libs/sentiment/src:/usr/crowd/app/services/libs/sentiment/src
- ../../services/libs/sqs/src:/usr/crowd/app/services/libs/sqs/src
- ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src
- ../../services/apps/cache_worker/src:/usr/crowd/app/services/apps/cache_worker/src

networks:
crowd-bridge:
external: true
18 changes: 18 additions & 0 deletions scripts/services/docker/Dockerfile.cache_worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM node:16-alpine as builder

WORKDIR /usr/crowd/app
RUN corepack enable && apk add --update --no-cache python3 build-base && ln -sf python3 /usr/bin/python && python3 -m ensurepip && pip3 install --no-cache --upgrade pip setuptools

COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./
COPY ./services ./services
RUN pnpm i --frozen-lockfile

FROM node:16-bookworm-slim as runner

WORKDIR /usr/crowd/app
RUN corepack enable && apt update && apt install -y ca-certificates --no-install-recommends && rm -rf /var/lib/apt/lists/*

COPY --from=builder /usr/crowd/app/node_modules ./node_modules
COPY --from=builder /usr/crowd/app/services/libs ./services/libs
COPY --from=builder /usr/crowd/app/services/archetypes/ ./services/archetypes
COPY --from=builder /usr/crowd/app/services/apps/cache_worker/ ./services/apps/cache_worker
22 changes: 22 additions & 0 deletions scripts/services/docker/Dockerfile.cache_worker.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
**/.git
**/node_modules
**/venv*
**/.webpack
**/.serverless
**/.cubestore
**/.env
**/.env.*
**/.idea
**/.vscode
**/dist
backend/server-config/
backend/util/
backend/src/serverless/microservices/python
.vscode/
.github/
frontend/
scripts/
.flake8
*.md
Makefile
backend/
6 changes: 3 additions & 3 deletions services/apps/cache_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
"version": "1.0.0",
"private": true,
"scripts": {
"start": "CROWD_TEMPORAL_TASKQUEUE=merge-suggestions SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=merge-suggestions SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true LOG_LEVEL=trace node --inspect=0.0.0.0:9232 -r tsconfig-paths/register -r ts-node/register src/main.ts",
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=merge-suggestions SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true LOG_LEVEL=trace node --inspect=0.0.0.0:9232 -r tsconfig-paths/register -r ts-node/register src/main.ts",
"start": "CROWD_TEMPORAL_TASKQUEUE=cache SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=cache SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true LOG_LEVEL=trace node --inspect=0.0.0.0:9232 -r tsconfig-paths/register -r ts-node/register src/main.ts",
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=cache SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true LOG_LEVEL=trace node --inspect=0.0.0.0:9232 -r tsconfig-paths/register -r ts-node/register src/main.ts",
"dev:local": "./node_modules/.bin/nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
"dev": "./node_modules/.bin/nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
"lint": "./node_modules/.bin/eslint --ext .ts src --max-warnings=0",
Expand Down
38 changes: 38 additions & 0 deletions services/apps/cache_worker/src/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {
getAllTenants,
getAllSegments,
getProjectLeafSegments,
getProjectGroupLeafSegments,
} from './activities/getTenantSegmentInfo'

import {
getDashboardCacheLastRefreshedAt,
getDefaultSegment,
getNewMembers,
getActiveMembers,
getNewOrganizations,
getActiveOrganizations,
getActivities,
saveToCache,
getActivePlatforms,
findNewActivityPlatforms,
updateMemberMergeSuggestionsLastGeneratedAt,
} from './activities/dashboard-cache/refreshDashboardCache'

export {
getAllTenants,
getAllSegments,
getProjectLeafSegments,
getProjectGroupLeafSegments,
getDashboardCacheLastRefreshedAt,
getDefaultSegment,
getNewMembers,
getActiveMembers,
getNewOrganizations,
getActiveOrganizations,
getActivities,
saveToCache,
getActivePlatforms,
findNewActivityPlatforms,
updateMemberMergeSuggestionsLastGeneratedAt,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import { svc } from '../../main'
import { ICubeQueryParams, IDashboardData, ISegment } from 'types'
import SegmentRepository from 'repo/segment.repo'
import { CubeJsRepository, CubeJsService } from '@crowd/cubejs'
import { RedisCache } from '@crowd/redis'
import { DashboardTimeframe } from 'enums'
import moment from 'moment'
import IntegrationRepository from 'repo/integration.repo'
import ActivityRepository from 'repo/activity.repo'

export async function getDashboardCacheLastRefreshedAt(segmentId: string): Promise<string> {
const segmentRepo = new SegmentRepository(svc.postgres.writer.connection(), svc.log)
return segmentRepo.getDashboardCacheLastRefreshedAt(segmentId)
}

export async function getDefaultSegment(tenantId: string): Promise<ISegment> {
const segmentRepo = new SegmentRepository(svc.postgres.writer.connection(), svc.log)
return segmentRepo.getDefaultSegment(tenantId)
}

export async function getActivePlatforms(leafSegmentIds: string[]): Promise<string[]> {
const integrationRepo = new IntegrationRepository(svc.postgres.writer.connection(), svc.log)
return integrationRepo.findActivePlatforms(leafSegmentIds)
}

export async function findNewActivityPlatforms(
dashboardLastRefreshedAt: string,
leafSegmentIds: string[],
): Promise<string[]> {
const activityRepo = new ActivityRepository(svc.postgres.writer.connection(), svc.log)
return activityRepo.findNewActivityPlatforms(dashboardLastRefreshedAt, leafSegmentIds)
}

export async function updateMemberMergeSuggestionsLastGeneratedAt(
segmentId: string,
): Promise<void> {
const segmentRepo = new SegmentRepository(svc.postgres.writer.connection(), svc.log)
await segmentRepo.updateDashboardCacheLastRefreshedAt(segmentId)
}

export async function getNewMembers<T>(params: ICubeQueryParams) {
const cjs = new CubeJsService()
await cjs.init(params.tenantId, params.segmentIds)

let result: T
try {
result = await CubeJsRepository.getNewMembers(
cjs,
moment(params.startDate),
moment(params.endDate),
params.granularity,
{
platform: params.platform,
},
params.rawResult,
)
} catch (err) {
throw new Error(err)
}

return result
}

export async function getActiveMembers<T>(params: ICubeQueryParams) {
const cjs = new CubeJsService()
await cjs.init(params.tenantId, params.segmentIds)

let result: T
try {
result = await CubeJsRepository.getActiveMembers(
cjs,
moment(params.startDate),
moment(params.endDate),
params.granularity,
{
platform: params.platform,
},
params.rawResult,
)
} catch (err) {
throw new Error(err)
}

return result
}

export async function getNewOrganizations<T>(params: ICubeQueryParams) {
const cjs = new CubeJsService()
await cjs.init(params.tenantId, params.segmentIds)

let result: T
try {
result = await CubeJsRepository.getNewOrganizations(
cjs,
moment(params.startDate),
moment(params.endDate),
params.granularity,
{
platform: params.platform,
},
params.rawResult,
)
} catch (err) {
throw new Error(err)
}

return result
}

export async function getActiveOrganizations<T>(params: ICubeQueryParams) {
const cjs = new CubeJsService()
await cjs.init(params.tenantId, params.segmentIds)

let result: T
try {
result = await CubeJsRepository.getActiveOrganizations(
cjs,
moment(params.startDate),
moment(params.endDate),
params.granularity,
{
platform: params.platform,
},
params.rawResult,
)
} catch (err) {
throw new Error(err)
}

return result
}

export async function getActivities<T>(params: ICubeQueryParams) {
const cjs = new CubeJsService()
await cjs.init(params.tenantId, params.segmentIds)

let result: T
try {
result = await CubeJsRepository.getNewActivities(
cjs,
moment(params.startDate),
moment(params.endDate),
params.granularity,
params.dimensions,
{
platform: params.platform,
},
params.order,
params.rawResult,
)
} catch (err) {
throw new Error(err)
}

return result
}

export async function saveToCache(
tenantId: string,
segmentId: string,
timeframe: DashboardTimeframe,
cacheData: IDashboardData,
platform?: string,
): Promise<void> {
const redisCache = new RedisCache(`dashboard-cache`, svc.redis, svc.log)
let key = `${tenantId}:${segmentId}:${timeframe}`
if (platform) {
key += `:${platform}`
}
await redisCache.set(key, JSON.stringify(cacheData))
}
Loading

0 comments on commit 9944b22

Please sign in to comment.