From 9944b2229ee9651227e05956425ef22442bafa1e Mon Sep 17 00:00:00 2001 From: anil Date: Fri, 12 Jan 2024 08:42:14 +0100 Subject: [PATCH] cache worker first version --- backend/src/bin/scripts/get-dashboard-data.ts | 26 +- .../R__cubejs-materialized-views.sql | 3 +- ...4923216__dashboardCacheLastRefreshedAt.sql | 1 + ...4923216__dashboardCacheLastRefreshedAt.sql | 2 + scripts/services/cache-worker.yaml | 63 +++ .../services/docker/Dockerfile.cache_worker | 18 + .../Dockerfile.cache_worker.dockerignore | 22 ++ services/apps/cache_worker/package.json | 6 +- services/apps/cache_worker/src/activities.ts | 38 ++ .../dashboard-cache/refreshDashboardCache.ts | 171 ++++++++ .../src/activities/getTenantSegmentInfo.ts | 34 ++ services/apps/cache_worker/src/enums.ts | 5 + services/apps/cache_worker/src/main.ts | 1 + .../cache_worker/src/repo/activity.repo.ts | 40 ++ .../cache_worker/src/repo/integration.repo.ts | 37 ++ .../cache_worker/src/repo/segment.repo.ts | 173 +++++++++ .../apps/cache_worker/src/repo/tenant.repo.ts | 33 ++ services/apps/cache_worker/src/types.ts | 110 ++++++ services/apps/cache_worker/src/workflows.ts | 5 + .../src/workflows/refreshDashboardCache.ts | 367 ++++++++++++++++++ ...spawnDashboardCacheRefreshForAllTenants.ts | 113 ++++++ services/archetypes/worker/src/index.ts | 2 + services/libs/cubejs/src/enums.ts | 3 + .../libs/cubejs/src/metrics/activeMembers.ts | 6 +- .../cubejs/src/metrics/activeOrganizations.ts | 6 +- services/libs/cubejs/src/metrics/index.ts | 1 - .../libs/cubejs/src/metrics/newActivities.ts | 8 +- .../src/metrics/newActivitiesTimeseries.ts | 69 ---- .../libs/cubejs/src/metrics/newMembers.ts | 15 +- .../cubejs/src/metrics/newOrganizations.ts | 6 +- services/libs/cubejs/src/repository.ts | 2 - services/libs/cubejs/src/types.ts | 2 +- services/libs/types/src/temporal/cache.ts | 5 + services/libs/types/src/temporal/index.ts | 1 + 34 files changed, 1297 insertions(+), 97 deletions(-) create mode 100644 backend/src/database/migrations/U1704923216__dashboardCacheLastRefreshedAt.sql create mode 100644 backend/src/database/migrations/V1704923216__dashboardCacheLastRefreshedAt.sql create mode 100644 scripts/services/cache-worker.yaml create mode 100644 scripts/services/docker/Dockerfile.cache_worker create mode 100644 scripts/services/docker/Dockerfile.cache_worker.dockerignore create mode 100644 services/apps/cache_worker/src/activities.ts create mode 100644 services/apps/cache_worker/src/activities/dashboard-cache/refreshDashboardCache.ts create mode 100644 services/apps/cache_worker/src/activities/getTenantSegmentInfo.ts create mode 100644 services/apps/cache_worker/src/enums.ts create mode 100644 services/apps/cache_worker/src/repo/activity.repo.ts create mode 100644 services/apps/cache_worker/src/repo/integration.repo.ts create mode 100644 services/apps/cache_worker/src/repo/segment.repo.ts create mode 100644 services/apps/cache_worker/src/repo/tenant.repo.ts create mode 100644 services/apps/cache_worker/src/types.ts create mode 100644 services/apps/cache_worker/src/workflows.ts create mode 100644 services/apps/cache_worker/src/workflows/refreshDashboardCache.ts create mode 100644 services/apps/cache_worker/src/workflows/spawnDashboardCacheRefreshForAllTenants.ts delete mode 100644 services/libs/cubejs/src/metrics/newActivitiesTimeseries.ts create mode 100644 services/libs/types/src/temporal/cache.ts diff --git a/backend/src/bin/scripts/get-dashboard-data.ts b/backend/src/bin/scripts/get-dashboard-data.ts index 4fc05c31d9..06e83a00e5 100644 --- a/backend/src/bin/scripts/get-dashboard-data.ts +++ b/backend/src/bin/scripts/get-dashboard-data.ts @@ -72,7 +72,29 @@ if (parameters.help || !parameters.tenantId || !parameters.segmentId) { const end = moment().endOf('day') - const data = await CubeJsRepository.getActiveMembers(cubejsService, start, end, null, {}, false) + console.log('START DATE: ') + console.log(start.toISOString()) + console.log('END DATE: ') + console.log(end.toISOString()) + + const previousStartDate = moment().subtract(13, 'days').startOf('day') + const previousEndDate = moment().subtract(7, 'days').endOf('day') + + console.log("PREVIOUS START DATE:") + console.log(previousStartDate.toISOString()) + console.log("PREVIOUS END DATE: ") + console.log(previousEndDate.toISOString()) + + const data = await CubeJsRepository.getNewActivities( + cubejsService, + start, + end, + 'day', + [], + {}, + { [CubeDimension.ACTIVITY_DATE]: CubeOrderDirection.ASC }, + true, + ) console.log(data) @@ -81,7 +103,7 @@ if (parameters.help || !parameters.tenantId || !parameters.segmentId) { start, end, null, - [CubeDimension.ACTIVITY_SENTIMENT_MOOD], + [CubeDimension.ACTIVITY_TYPE, CubeDimension.ACTIVITY_PLATFORM], { platform: 'github', }, diff --git a/backend/src/database/migrations/R__cubejs-materialized-views.sql b/backend/src/database/migrations/R__cubejs-materialized-views.sql index 280a7dc0f0..81cc8cd3f8 100644 --- a/backend/src/database/migrations/R__cubejs-materialized-views.sql +++ b/backend/src/database/migrations/R__cubejs-materialized-views.sql @@ -38,7 +38,8 @@ SELECT END::VARCHAR(8) AS "sentimentMood", a."organizationId", a."segmentId", - a."conversationId" + a."conversationId", + a."createdAt" FROM activities a WHERE a."deletedAt" IS NULL ; diff --git a/backend/src/database/migrations/U1704923216__dashboardCacheLastRefreshedAt.sql b/backend/src/database/migrations/U1704923216__dashboardCacheLastRefreshedAt.sql new file mode 100644 index 0000000000..eb19081fcc --- /dev/null +++ b/backend/src/database/migrations/U1704923216__dashboardCacheLastRefreshedAt.sql @@ -0,0 +1 @@ +alter table "segments" drop column "dashboardCacheLastRefreshedAt"; \ No newline at end of file diff --git a/backend/src/database/migrations/V1704923216__dashboardCacheLastRefreshedAt.sql b/backend/src/database/migrations/V1704923216__dashboardCacheLastRefreshedAt.sql new file mode 100644 index 0000000000..9eae562ae9 --- /dev/null +++ b/backend/src/database/migrations/V1704923216__dashboardCacheLastRefreshedAt.sql @@ -0,0 +1,2 @@ +alter table "segments" +add column "dashboardCacheLastRefreshedAt" timestamp with time zone null; \ No newline at end of file diff --git a/scripts/services/cache-worker.yaml b/scripts/services/cache-worker.yaml new file mode 100644 index 0000000000..d8b29fb6dd --- /dev/null +++ b/scripts/services/cache-worker.yaml @@ -0,0 +1,63 @@ +version: '3.1' + +x-env-args: &env-args + DOCKER_BUILDKIT: 1 + NODE_ENV: docker + SERVICE: cache-worker + CROWD_TEMPORAL_TASKQUEUE: cache + SHELL: /bin/sh + +services: + cache-worker: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.cache_worker + command: 'pnpm run start' + working_dir: /usr/crowd/app/services/apps/cache_worker + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + restart: always + networks: + - crowd-bridge + + cache-worker-dev: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.cache_worker + command: 'pnpm run dev' + working_dir: /usr/crowd/app/services/apps/cache_worker + # user: '${USER_ID}:${GROUP_ID}' + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + hostname: cache-worker + networks: + - crowd-bridge + volumes: + - ../../services/libs/alerting/src:/usr/crowd/app/services/libs/alerting/src + - ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src + - ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src + - ../../services/libs/conversations/src:/usr/crowd/app/services/libs/conversations/src + - ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src + - ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src + - ../../services/libs/ioc/src:/usr/crowd/app/services/libs/ioc/src + - ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src + - ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src + - ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src + - ../../services/libs/sentiment/src:/usr/crowd/app/services/libs/sentiment/src + - ../../services/libs/sqs/src:/usr/crowd/app/services/libs/sqs/src + - ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src + - ../../services/apps/cache_worker/src:/usr/crowd/app/services/apps/cache_worker/src + +networks: + crowd-bridge: + external: true diff --git a/scripts/services/docker/Dockerfile.cache_worker b/scripts/services/docker/Dockerfile.cache_worker new file mode 100644 index 0000000000..ebc2107e71 --- /dev/null +++ b/scripts/services/docker/Dockerfile.cache_worker @@ -0,0 +1,18 @@ +FROM node:16-alpine as builder + +WORKDIR /usr/crowd/app +RUN corepack enable && apk add --update --no-cache python3 build-base && ln -sf python3 /usr/bin/python && python3 -m ensurepip && pip3 install --no-cache --upgrade pip setuptools + +COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./ +COPY ./services ./services +RUN pnpm i --frozen-lockfile + +FROM node:16-bookworm-slim as runner + +WORKDIR /usr/crowd/app +RUN corepack enable && apt update && apt install -y ca-certificates --no-install-recommends && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /usr/crowd/app/node_modules ./node_modules +COPY --from=builder /usr/crowd/app/services/libs ./services/libs +COPY --from=builder /usr/crowd/app/services/archetypes/ ./services/archetypes +COPY --from=builder /usr/crowd/app/services/apps/cache_worker/ ./services/apps/cache_worker \ No newline at end of file diff --git a/scripts/services/docker/Dockerfile.cache_worker.dockerignore b/scripts/services/docker/Dockerfile.cache_worker.dockerignore new file mode 100644 index 0000000000..77d313162a --- /dev/null +++ b/scripts/services/docker/Dockerfile.cache_worker.dockerignore @@ -0,0 +1,22 @@ +**/.git +**/node_modules +**/venv* +**/.webpack +**/.serverless +**/.cubestore +**/.env +**/.env.* +**/.idea +**/.vscode +**/dist +backend/server-config/ +backend/util/ +backend/src/serverless/microservices/python +.vscode/ +.github/ +frontend/ +scripts/ +.flake8 +*.md +Makefile +backend/ \ No newline at end of file diff --git a/services/apps/cache_worker/package.json b/services/apps/cache_worker/package.json index 2c187f8b0e..48bddbc2d2 100644 --- a/services/apps/cache_worker/package.json +++ b/services/apps/cache_worker/package.json @@ -3,9 +3,9 @@ "version": "1.0.0", "private": true, "scripts": { - "start": "CROWD_TEMPORAL_TASKQUEUE=merge-suggestions SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/main.ts", - "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=merge-suggestions SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true LOG_LEVEL=trace node --inspect=0.0.0.0:9232 -r tsconfig-paths/register -r ts-node/register src/main.ts", - "start:debug": "CROWD_TEMPORAL_TASKQUEUE=merge-suggestions SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true LOG_LEVEL=trace node --inspect=0.0.0.0:9232 -r tsconfig-paths/register -r ts-node/register src/main.ts", + "start": "CROWD_TEMPORAL_TASKQUEUE=cache SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/main.ts", + "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=cache SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true LOG_LEVEL=trace node --inspect=0.0.0.0:9232 -r tsconfig-paths/register -r ts-node/register src/main.ts", + "start:debug": "CROWD_TEMPORAL_TASKQUEUE=cache SERVICE=cache-worker TS_NODE_TRANSPILE_ONLY=true LOG_LEVEL=trace node --inspect=0.0.0.0:9232 -r tsconfig-paths/register -r ts-node/register src/main.ts", "dev:local": "./node_modules/.bin/nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", "dev": "./node_modules/.bin/nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug", "lint": "./node_modules/.bin/eslint --ext .ts src --max-warnings=0", diff --git a/services/apps/cache_worker/src/activities.ts b/services/apps/cache_worker/src/activities.ts new file mode 100644 index 0000000000..db4b39d25b --- /dev/null +++ b/services/apps/cache_worker/src/activities.ts @@ -0,0 +1,38 @@ +import { + getAllTenants, + getAllSegments, + getProjectLeafSegments, + getProjectGroupLeafSegments, +} from './activities/getTenantSegmentInfo' + +import { + getDashboardCacheLastRefreshedAt, + getDefaultSegment, + getNewMembers, + getActiveMembers, + getNewOrganizations, + getActiveOrganizations, + getActivities, + saveToCache, + getActivePlatforms, + findNewActivityPlatforms, + updateMemberMergeSuggestionsLastGeneratedAt, +} from './activities/dashboard-cache/refreshDashboardCache' + +export { + getAllTenants, + getAllSegments, + getProjectLeafSegments, + getProjectGroupLeafSegments, + getDashboardCacheLastRefreshedAt, + getDefaultSegment, + getNewMembers, + getActiveMembers, + getNewOrganizations, + getActiveOrganizations, + getActivities, + saveToCache, + getActivePlatforms, + findNewActivityPlatforms, + updateMemberMergeSuggestionsLastGeneratedAt, +} diff --git a/services/apps/cache_worker/src/activities/dashboard-cache/refreshDashboardCache.ts b/services/apps/cache_worker/src/activities/dashboard-cache/refreshDashboardCache.ts new file mode 100644 index 0000000000..740d192809 --- /dev/null +++ b/services/apps/cache_worker/src/activities/dashboard-cache/refreshDashboardCache.ts @@ -0,0 +1,171 @@ +import { svc } from '../../main' +import { ICubeQueryParams, IDashboardData, ISegment } from 'types' +import SegmentRepository from 'repo/segment.repo' +import { CubeJsRepository, CubeJsService } from '@crowd/cubejs' +import { RedisCache } from '@crowd/redis' +import { DashboardTimeframe } from 'enums' +import moment from 'moment' +import IntegrationRepository from 'repo/integration.repo' +import ActivityRepository from 'repo/activity.repo' + +export async function getDashboardCacheLastRefreshedAt(segmentId: string): Promise { + const segmentRepo = new SegmentRepository(svc.postgres.writer.connection(), svc.log) + return segmentRepo.getDashboardCacheLastRefreshedAt(segmentId) +} + +export async function getDefaultSegment(tenantId: string): Promise { + const segmentRepo = new SegmentRepository(svc.postgres.writer.connection(), svc.log) + return segmentRepo.getDefaultSegment(tenantId) +} + +export async function getActivePlatforms(leafSegmentIds: string[]): Promise { + const integrationRepo = new IntegrationRepository(svc.postgres.writer.connection(), svc.log) + return integrationRepo.findActivePlatforms(leafSegmentIds) +} + +export async function findNewActivityPlatforms( + dashboardLastRefreshedAt: string, + leafSegmentIds: string[], +): Promise { + const activityRepo = new ActivityRepository(svc.postgres.writer.connection(), svc.log) + return activityRepo.findNewActivityPlatforms(dashboardLastRefreshedAt, leafSegmentIds) +} + +export async function updateMemberMergeSuggestionsLastGeneratedAt( + segmentId: string, +): Promise { + const segmentRepo = new SegmentRepository(svc.postgres.writer.connection(), svc.log) + await segmentRepo.updateDashboardCacheLastRefreshedAt(segmentId) +} + +export async function getNewMembers(params: ICubeQueryParams) { + const cjs = new CubeJsService() + await cjs.init(params.tenantId, params.segmentIds) + + let result: T + try { + result = await CubeJsRepository.getNewMembers( + cjs, + moment(params.startDate), + moment(params.endDate), + params.granularity, + { + platform: params.platform, + }, + params.rawResult, + ) + } catch (err) { + throw new Error(err) + } + + return result +} + +export async function getActiveMembers(params: ICubeQueryParams) { + const cjs = new CubeJsService() + await cjs.init(params.tenantId, params.segmentIds) + + let result: T + try { + result = await CubeJsRepository.getActiveMembers( + cjs, + moment(params.startDate), + moment(params.endDate), + params.granularity, + { + platform: params.platform, + }, + params.rawResult, + ) + } catch (err) { + throw new Error(err) + } + + return result +} + +export async function getNewOrganizations(params: ICubeQueryParams) { + const cjs = new CubeJsService() + await cjs.init(params.tenantId, params.segmentIds) + + let result: T + try { + result = await CubeJsRepository.getNewOrganizations( + cjs, + moment(params.startDate), + moment(params.endDate), + params.granularity, + { + platform: params.platform, + }, + params.rawResult, + ) + } catch (err) { + throw new Error(err) + } + + return result +} + +export async function getActiveOrganizations(params: ICubeQueryParams) { + const cjs = new CubeJsService() + await cjs.init(params.tenantId, params.segmentIds) + + let result: T + try { + result = await CubeJsRepository.getActiveOrganizations( + cjs, + moment(params.startDate), + moment(params.endDate), + params.granularity, + { + platform: params.platform, + }, + params.rawResult, + ) + } catch (err) { + throw new Error(err) + } + + return result +} + +export async function getActivities(params: ICubeQueryParams) { + const cjs = new CubeJsService() + await cjs.init(params.tenantId, params.segmentIds) + + let result: T + try { + result = await CubeJsRepository.getNewActivities( + cjs, + moment(params.startDate), + moment(params.endDate), + params.granularity, + params.dimensions, + { + platform: params.platform, + }, + params.order, + params.rawResult, + ) + } catch (err) { + throw new Error(err) + } + + return result +} + +export async function saveToCache( + tenantId: string, + segmentId: string, + timeframe: DashboardTimeframe, + cacheData: IDashboardData, + platform?: string, +): Promise { + const redisCache = new RedisCache(`dashboard-cache`, svc.redis, svc.log) + let key = `${tenantId}:${segmentId}:${timeframe}` + if (platform) { + key += `:${platform}` + } + await redisCache.set(key, JSON.stringify(cacheData)) +} diff --git a/services/apps/cache_worker/src/activities/getTenantSegmentInfo.ts b/services/apps/cache_worker/src/activities/getTenantSegmentInfo.ts new file mode 100644 index 0000000000..3484abf894 --- /dev/null +++ b/services/apps/cache_worker/src/activities/getTenantSegmentInfo.ts @@ -0,0 +1,34 @@ +import { ISegment, ITenant } from 'types' +import { svc } from '../main' +import TenantRepository from 'repo/tenant.repo' +import SegmentRepository from 'repo/segment.repo' + +export async function getAllTenants(): Promise { + const tenantRepository = new TenantRepository(svc.postgres.writer.connection(), svc.log) + return tenantRepository.getAllTenants() +} + +export async function getAllSegments( + tenantId: string, + limit: number, + offset: number, +): Promise { + const segmentRepository = new SegmentRepository(svc.postgres.writer.connection(), svc.log) + return segmentRepository.getAllSegments(tenantId, limit, offset) +} + +export async function getProjectLeafSegments( + parentSlug: string, + tenantId: string, +): Promise { + const segmentRepository = new SegmentRepository(svc.postgres.writer.connection(), svc.log) + return segmentRepository.getProjectLeafSegments(parentSlug, tenantId) +} + +export async function getProjectGroupLeafSegments( + grandparentSlug: string, + tenantId: string, +): Promise { + const segmentRepository = new SegmentRepository(svc.postgres.writer.connection(), svc.log) + return segmentRepository.getProjectGroupLeafSegments(grandparentSlug, tenantId) +} \ No newline at end of file diff --git a/services/apps/cache_worker/src/enums.ts b/services/apps/cache_worker/src/enums.ts new file mode 100644 index 0000000000..2ed0d8a23d --- /dev/null +++ b/services/apps/cache_worker/src/enums.ts @@ -0,0 +1,5 @@ +export enum DashboardTimeframe { + LAST_7_DAYS = '7d', + LAST_14_DAYS = '14d', + LAST_30_DAYS = '30d', +} diff --git a/services/apps/cache_worker/src/main.ts b/services/apps/cache_worker/src/main.ts index 840b025af4..f0cff9a4f3 100644 --- a/services/apps/cache_worker/src/main.ts +++ b/services/apps/cache_worker/src/main.ts @@ -15,6 +15,7 @@ const config: Config = { } const options: Options = { + maxConcurrentActivityTaskExecutions: 20, postgres: { enabled: true, }, diff --git a/services/apps/cache_worker/src/repo/activity.repo.ts b/services/apps/cache_worker/src/repo/activity.repo.ts new file mode 100644 index 0000000000..0f2474efec --- /dev/null +++ b/services/apps/cache_worker/src/repo/activity.repo.ts @@ -0,0 +1,40 @@ +import { DbConnection, DbTransaction } from '@crowd/database' +import { Logger } from '@crowd/logging' +import { IPlatforms } from 'types' + +class ActivityRepository { + constructor( + private readonly connection: DbConnection | DbTransaction, + private readonly log: Logger, + ) {} + + async findNewActivityPlatforms( + dashboardLastRefreshedAt: string, + leafSegmentIds: string[], + ): Promise { + let result: IPlatforms + try { + result = await this.connection.oneOrNone( + ` + select + array_agg(distinct a.platform) as platforms + from mv_activities_cube a + where a."segmentId" in ($(leafSegmentIds:csv)) + and a."createdAt" > $(dashboardLastRefreshedAt) + `, + { + leafSegmentIds, + dashboardLastRefreshedAt, + }, + ) + } catch (err) { + this.log.error('Error while getting all tenants', err) + + throw new Error(err) + } + + return result.platforms + } +} + +export default ActivityRepository diff --git a/services/apps/cache_worker/src/repo/integration.repo.ts b/services/apps/cache_worker/src/repo/integration.repo.ts new file mode 100644 index 0000000000..b1d13b3acc --- /dev/null +++ b/services/apps/cache_worker/src/repo/integration.repo.ts @@ -0,0 +1,37 @@ +import { DbConnection, DbTransaction } from '@crowd/database' +import { Logger } from '@crowd/logging' +import { IPlatforms } from 'types' + +class IntegrationRepository { + constructor( + private readonly connection: DbConnection | DbTransaction, + private readonly log: Logger, + ) {} + + async findActivePlatforms(leafSegmentIds: string[]): Promise { + let result: IPlatforms + try { + result = await this.connection.oneOrNone( + ` + select + array_agg(distinct i.platform) as platforms + from integrations i + where i."segmentId" in ($(leafSegmentIds:csv)) + and i."deletedAt" is null + and i.status in ('done', 'in-progress'); + `, + { + leafSegmentIds, + }, + ) + } catch (err) { + this.log.error('Error while getting all tenants', err) + + throw new Error(err) + } + + return result.platforms + } +} + +export default IntegrationRepository diff --git a/services/apps/cache_worker/src/repo/segment.repo.ts b/services/apps/cache_worker/src/repo/segment.repo.ts new file mode 100644 index 0000000000..76134ee697 --- /dev/null +++ b/services/apps/cache_worker/src/repo/segment.repo.ts @@ -0,0 +1,173 @@ +import { DbConnection, DbTransaction } from '@crowd/database' +import { Logger } from '@crowd/logging' +import { IDashboardCacheLastRefreshedAt, ISegment } from 'types' + +class SegmentRepository { + constructor( + private readonly connection: DbConnection | DbTransaction, + private readonly log: Logger, + ) {} + + async getAllSegments(tenantId: string, limit: number, offset: number): Promise { + let rows: ISegment[] = [] + try { + rows = await this.connection.query( + ` + select + id as "segmentId", + slug, + "parentSlug", + "grandparentSlug" + from segments + where "tenantId" = $(tenantId) + order by id asc + limit $(limit) + offset $(offset) + + `, + { + tenantId, + limit, + offset, + }, + ) + } catch (err) { + this.log.error('Error while getting all segments', err) + + throw new Error(err) + } + + return rows || [] + } + + // getProjectLeafSegments + async getProjectLeafSegments(parentSlug: string, tenantId: string): Promise { + let rows: ISegment[] = [] + try { + rows = await this.connection.query( + ` + select + id, + slug, + "parentSlug", + "grandparentSlug" + from segments + where "parentSlug" = $(parentSlug) + and "tenantId" = $(tenantId) + and slug is not null + and "grandparentSlug" is not null; + + `, + { + tenantId, + parentSlug, + }, + ) + } catch (err) { + this.log.error(`Error while getting leaf segments of project ${parentSlug}`, err) + + throw new Error(err) + } + + return rows || [] + } + + async getProjectGroupLeafSegments( + grandparentSlug: string, + tenantId: string, + ): Promise { + let rows: ISegment[] = [] + try { + rows = await this.connection.query( + ` + select + id, + slug, + "parentSlug", + "grandparentSlug" + from segments + where "grandparentSlug" = $(grandparentSlug) + and "tenantId" = $(tenantId) + and slug is not null + and "parentSlug" is not null; + + `, + { + tenantId, + grandparentSlug, + }, + ) + } catch (err) { + this.log.error(`Error while getting leaf segments of project group ${grandparentSlug}`, err) + + throw new Error(err) + } + + return rows || [] + } + + async getDefaultSegment(tenantId: string): Promise { + let result: ISegment + try { + result = await this.connection.oneOrNone( + ` + select + id as "segmentId", + slug, + "parentSlug", + "grandparentSlug" + from segments + where "tenantId" = $(tenantId) + and slug is not null + and "parentSlug" is not null + and "grandparentSlug" is not null; + + `, + { + tenantId, + }, + ) + } catch (err) { + this.log.error(`Error while getting the default segment of tenant ${tenantId}`, err) + + throw new Error(err) + } + + return result + } + + async getDashboardCacheLastRefreshedAt(segmentId: string): Promise { + try { + const result: IDashboardCacheLastRefreshedAt = await this.connection.oneOrNone( + ` + select "dashboardCacheLastRefreshedAt" + from segments + where "id" = $(segmentId);`, + { + segmentId, + }, + ) + return result?.dashboardCacheLastRefreshedAt + } catch (err) { + throw new Error(err) + } + } + + async updateDashboardCacheLastRefreshedAt(segmentId: string): Promise { + try { + await this.connection.any( + ` + update segments set "dashboardCacheLastRefreshedAt" = now() + where "id" = $(segmentId); + `, + { + segmentId, + }, + ) + } catch (err) { + throw new Error(err) + } + } +} + +export default SegmentRepository diff --git a/services/apps/cache_worker/src/repo/tenant.repo.ts b/services/apps/cache_worker/src/repo/tenant.repo.ts new file mode 100644 index 0000000000..7b0212d9f7 --- /dev/null +++ b/services/apps/cache_worker/src/repo/tenant.repo.ts @@ -0,0 +1,33 @@ +import { DbConnection, DbTransaction } from '@crowd/database' +import { Logger } from '@crowd/logging' +import { ITenant } from 'types' + +class TenantRepository { + constructor( + private readonly connection: DbConnection | DbTransaction, + private readonly log: Logger, + ) {} + + async getAllTenants(): Promise { + let rows: ITenant[] = [] + try { + rows = await this.connection.query(` + select + id as "tenantId", + plan + from tenants + where "deletedAt" is null + and plan IN ('Scale', 'Growth', 'Essential') + and ("trialEndsAt" > NOW() or "trialEndsAt" is null); + `) + } catch (err) { + this.log.error('Error while getting all tenants', err) + + throw new Error(err) + } + + return rows + } +} + +export default TenantRepository diff --git a/services/apps/cache_worker/src/types.ts b/services/apps/cache_worker/src/types.ts new file mode 100644 index 0000000000..389ffea644 --- /dev/null +++ b/services/apps/cache_worker/src/types.ts @@ -0,0 +1,110 @@ +import { CubeDimension, CubeGranularity, CubeMeasure, ICubeOrder } from '@crowd/cubejs' + +export interface ITenant { + tenantId: string + plan: string +} + +export interface ISegment { + segmentId: string + slug: string + parentSlug: string + grandparentSlug: string +} + +export interface IDashboardCacheLastRefreshedAt { + dashboardCacheLastRefreshedAt: string +} + +export interface IActiveMembersTimeseriesResult { + [CubeDimension.ACTIVITY_DATE_DAY]: string + [CubeDimension.ACTIVITY_DATE]: string + [CubeMeasure.MEMBER_COUNT]: string +} + +export interface INewMembersTimeseriesResult { + [CubeDimension.MEMBER_JOINED_AT_DAY]: string + [CubeDimension.MEMBER_JOINED_AT]: string + [CubeMeasure.MEMBER_COUNT]: string +} + +export interface INewOrganizationsTimeseriesResult { + [CubeDimension.ORGANIZATIONS_JOINED_AT_DAY]: string + [CubeDimension.ORGANIZATIONS_JOINED_AT]: string + [CubeMeasure.ORGANIZATION_COUNT]: string +} + +export interface IActiveOrganizationsTimeseriesResult { + [CubeDimension.ACTIVITY_DATE_DAY]: string + [CubeDimension.ACTIVITY_DATE]: string + [CubeMeasure.ORGANIZATION_COUNT]: string +} + +export interface IActivityTimeseriesResult { + [CubeDimension.ACTIVITY_DATE_DAY]: string + [CubeDimension.ACTIVITY_DATE]: string + [CubeMeasure.ACTIVITY_COUNT]: string +} + +export interface IActivityBySentimentMoodResult { + [CubeDimension.ACTIVITY_SENTIMENT_MOOD]: string +} + +export interface IActivityByTypeAndPlatformResult { + [CubeDimension.ACTIVITY_TYPE]: string + [CubeDimension.ACTIVITY_PLATFORM]: string + [CubeMeasure.ACTIVITY_COUNT]: string +} + +export interface IDashboardData { + activeMembers: { + total: number + previousPeriodTotal: number + timeseries: IActiveMembersTimeseriesResult[] + } + newMembers: { + total: number + previousPeriodTotal: number + timeseries: INewMembersTimeseriesResult[] + } + newOrganizations: { + total: number + previousPeriodTotal: number + timeseries: INewOrganizationsTimeseriesResult[] + } + activeOrganizations: { + total: number + previousPeriodTotal: number + timeseries: IActiveOrganizationsTimeseriesResult[] + } + activity: { + total: number + previousPeriodTotal: number + timeseries: IActivityTimeseriesResult[] + bySentimentMood: IActivityBySentimentMoodResult[] + byTypeAndPlatform: IActivityByTypeAndPlatformResult[] + } +} + +export interface ITimeframe { + startDate: string + endDate: string + previousPeriodStartDate: string + previousPeriodEndDate: string +} + +export interface ICubeQueryParams { + tenantId: string + segmentIds: string[] + startDate: string + endDate: string + granularity?: CubeGranularity | string + platform?: string + rawResult?: boolean + dimensions?: CubeDimension[] | string[] + order?: ICubeOrder +} + +export interface IPlatforms { + platforms: string[] +} diff --git a/services/apps/cache_worker/src/workflows.ts b/services/apps/cache_worker/src/workflows.ts new file mode 100644 index 0000000000..015766b58d --- /dev/null +++ b/services/apps/cache_worker/src/workflows.ts @@ -0,0 +1,5 @@ +import { refreshDashboardCache } from './workflows/refreshDashboardCache' + +import { spawnDashboardCacheRefreshForAllTenants } from './workflows/spawnDashboardCacheRefreshForAllTenants' + +export { spawnDashboardCacheRefreshForAllTenants, refreshDashboardCache } diff --git a/services/apps/cache_worker/src/workflows/refreshDashboardCache.ts b/services/apps/cache_worker/src/workflows/refreshDashboardCache.ts new file mode 100644 index 0000000000..7726c0eef4 --- /dev/null +++ b/services/apps/cache_worker/src/workflows/refreshDashboardCache.ts @@ -0,0 +1,367 @@ +import { proxyActivities } from '@temporalio/workflow' +import * as activities from '../activities/dashboard-cache/refreshDashboardCache' + +import { IProcessRefreshDashboardCacheArgs } from '@crowd/types' +import { DashboardTimeframe } from '../enums' +import { + IActiveMembersTimeseriesResult, + IActiveOrganizationsTimeseriesResult, + IActivityBySentimentMoodResult, + IActivityByTypeAndPlatformResult, + IActivityTimeseriesResult, + IDashboardData, + INewMembersTimeseriesResult, + INewOrganizationsTimeseriesResult, + ITimeframe, +} from 'types' +import moment from 'moment' + +const activity = proxyActivities({ startToCloseTimeout: '1 minute' }) + +export async function refreshDashboardCache( + args: IProcessRefreshDashboardCacheArgs, +): Promise { + console.log(args) + + // if no segments were sent, set current segment as default one + if (!args.segmentId) { + const defaultSegment = await activity.getDefaultSegment(args.tenantId) + args.segmentId = defaultSegment.segmentId + args.leafSegmentIds = [defaultSegment.segmentId] + } + + const dashboardLastRefreshedAt = await activity.getDashboardCacheLastRefreshedAt(args.segmentId) + + const activePlatforms = await activity.getActivePlatforms(args.leafSegmentIds) + + if (!dashboardLastRefreshedAt) { + // main view with no platform filter + await refreshDashboardCacheForAllTimeranges(args.tenantId, args.segmentId, args.leafSegmentIds) + + // for each platform also cache dashboard values + for (const platform of activePlatforms) { + await refreshDashboardCacheForAllTimeranges( + args.tenantId, + args.segmentId, + args.leafSegmentIds, + platform, + ) + } + } else { + // first check if there's a new activity between dashboardLastRefreshedAt and now() + const platforms = await activity.findNewActivityPlatforms( + dashboardLastRefreshedAt, + args.leafSegmentIds, + ) + + // only refresh the main view and returned platform views if there are new activities + if (platforms && platforms.length > 0) { + // refresh the main view + await refreshDashboardCacheForAllTimeranges( + args.tenantId, + args.segmentId, + args.leafSegmentIds, + ) + + for (const platform of platforms) { + await refreshDashboardCacheForAllTimeranges( + args.tenantId, + args.segmentId, + args.leafSegmentIds, + platform, + ) + } + } else { + console.log('No new activities found.. not calculating cache again!') + } + } + + // update dashboardLastRefreshedAt + await activity.updateMemberMergeSuggestionsLastGeneratedAt(args.segmentId) + console.log( + `Done generating dashboard cache for tenant ${args.tenantId}, segment: ${args.segmentId}`, + ) +} + +async function refreshDashboardCacheForAllTimeranges( + tenantId: string, + segmentId: string, + leafSegmentIds: string[], + platform?: string, +) { + const info = platform ?? 'all' + console.log(`Refreshing cache for ${info}!`) + for (const timeframe in DashboardTimeframe) { + const data = await getDashboardCacheData( + tenantId, + leafSegmentIds, + DashboardTimeframe[timeframe], + platform, + ) + + // try saving it to cache + await activity.saveToCache(tenantId, segmentId, DashboardTimeframe[timeframe], data, platform) + } +} + +async function getDashboardCacheData( + tenantId: string, + segmentIds: string[], + timeframe: DashboardTimeframe, + platform?: string, +): Promise { + // build dateranges + const { startDate, endDate, previousPeriodStartDate, previousPeriodEndDate } = + buildTimeframe(timeframe) + + // new members total + const newMembersTotal = await activity.getNewMembers({ + tenantId, + segmentIds, + startDate, + endDate, + platform, + }) + + // new members previous period total + const newMembersPreviousPeriodTotal = await activity.getNewMembers({ + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + platform, + }) + + // new members timeseries + const newMembersTimeseries = await activity.getNewMembers({ + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + granularity: 'day', + platform, + rawResult: true, + }) + + // active members total + const activeMembersTotal = await activity.getActiveMembers({ + tenantId, + segmentIds, + startDate, + endDate, + platform, + }) + + // active members previous period total + const activeMembersPreviousPeriodTotal = await activity.getActiveMembers({ + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + platform, + }) + + // active members timeseries + const activeMembersTimeseries = await activity.getActiveMembers( + { + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + granularity: 'day', + platform, + rawResult: true, + }, + ) + + // new organizations total + const newOrganizationsTotal = await activity.getNewOrganizations({ + tenantId, + segmentIds, + startDate, + endDate, + platform, + }) + + // new organizations previous period total + const newOrganizationsPreviousPeriodTotal = await activity.getNewOrganizations({ + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + platform, + }) + + // new organizations timeseries + const newOrganizationsTimeseries = await activity.getNewOrganizations< + INewOrganizationsTimeseriesResult[] + >({ + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + granularity: 'day', + platform, + rawResult: true, + }) + + // active organizations total + const activeOrganizationsTotal = await activity.getActiveOrganizations({ + tenantId, + segmentIds, + startDate, + endDate, + platform, + }) + + // active organizations previous period total + const activeOrganizationsPreviousPeriodTotal = await activity.getActiveOrganizations({ + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + platform, + }) + + // active organizations timeseries + const activeOrganizationsTimeseries = await activity.getActiveOrganizations< + IActiveOrganizationsTimeseriesResult[] + >({ + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + granularity: 'day', + platform, + rawResult: true, + }) + + // activities total + const activitiesTotal = await activity.getActivities({ + tenantId, + segmentIds, + startDate, + endDate, + platform, + }) + + // activities previous period total + const activitiesPreviousPeriodTotal = await activity.getActivities({ + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + platform, + }) + + // activities timeseries + const activitiesTimeseries = await activity.getActivities({ + tenantId, + segmentIds, + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + granularity: 'day', + platform, + rawResult: true, + }) + + // activities by sentiment mood + const activitiesBySentimentMood = await activity.getActivities({ + tenantId, + segmentIds, + startDate, + endDate, + dimensions: ['Activities.sentimentMood'], + platform, + rawResult: true, + }) + + // activities by type and platform + const activitiesByTypeAndPlatform = await activity.getActivities< + IActivityByTypeAndPlatformResult[] + >({ + tenantId, + segmentIds, + startDate, + endDate, + dimensions: ['Activities.type', 'Activities.platform'], + platform, + rawResult: true, + }) + + return { + newMembers: { + total: newMembersTotal, + previousPeriodTotal: newMembersPreviousPeriodTotal, + timeseries: newMembersTimeseries, + }, + activeMembers: { + total: activeMembersTotal, + previousPeriodTotal: activeMembersPreviousPeriodTotal, + timeseries: activeMembersTimeseries, + }, + newOrganizations: { + total: newOrganizationsTotal, + previousPeriodTotal: newOrganizationsPreviousPeriodTotal, + timeseries: newOrganizationsTimeseries, + }, + activeOrganizations: { + total: activeOrganizationsTotal, + previousPeriodTotal: activeOrganizationsPreviousPeriodTotal, + timeseries: activeOrganizationsTimeseries, + }, + activity: { + total: activitiesTotal, + previousPeriodTotal: activitiesPreviousPeriodTotal, + timeseries: activitiesTimeseries, + bySentimentMood: activitiesBySentimentMood, + byTypeAndPlatform: activitiesByTypeAndPlatform, + }, + } +} + +function buildTimeframe(timeframe: DashboardTimeframe): ITimeframe { + if (timeframe === DashboardTimeframe.LAST_7_DAYS) { + const startDate = moment().subtract(6, 'days').startOf('day').toISOString() + const endDate = moment().endOf('day').toISOString() + const previousPeriodStartDate = moment().subtract(13, 'days').startOf('day').toISOString() + const previousPeriodEndDate = moment().subtract(7, 'days').endOf('day').toISOString() + + return { + startDate, + endDate, + previousPeriodStartDate, + previousPeriodEndDate, + } + } + + if (timeframe === DashboardTimeframe.LAST_14_DAYS) { + const startDate = moment().subtract(13, 'days').startOf('day').toISOString() + const endDate = moment().endOf('day').toISOString() + const previousPeriodStartDate = moment().subtract(27, 'days').startOf('day').toISOString() + const previousPeriodEndDate = moment().subtract(14, 'days').endOf('day').toISOString() + + return { + startDate, + endDate, + previousPeriodStartDate, + previousPeriodEndDate, + } + } + + if (timeframe === DashboardTimeframe.LAST_30_DAYS) { + const startDate = moment().subtract(29, 'days').startOf('day').toISOString() + const endDate = moment().endOf('day').toISOString() + const previousPeriodStartDate = moment().subtract(59, 'days').startOf('day').toISOString() + const previousPeriodEndDate = moment().subtract(30, 'days').endOf('day').toISOString() + + return { + startDate, + endDate, + previousPeriodStartDate, + previousPeriodEndDate, + } + } + + throw new Error(`Unsupported timerange ${timeframe}!`) +} diff --git a/services/apps/cache_worker/src/workflows/spawnDashboardCacheRefreshForAllTenants.ts b/services/apps/cache_worker/src/workflows/spawnDashboardCacheRefreshForAllTenants.ts new file mode 100644 index 0000000000..6e11e21b62 --- /dev/null +++ b/services/apps/cache_worker/src/workflows/spawnDashboardCacheRefreshForAllTenants.ts @@ -0,0 +1,113 @@ +import { + proxyActivities, + startChild, + ParentClosePolicy, + ChildWorkflowCancellationType, + workflowInfo, +} from '@temporalio/workflow' + +import * as activities from '../activities/getTenantSegmentInfo' +import { refreshDashboardCache } from './refreshDashboardCache' +// import { isFeatureEnabled } from '@crowd/feature-flags' +// import { FeatureFlag } from '@crowd/types' +import { ISegment } from 'types' + +const activity = proxyActivities({ startToCloseTimeout: '1 minute' }) + +export async function spawnDashboardCacheRefreshForAllTenants(): Promise { + const tenants = await activity.getAllTenants() + // const segmentsEnabled = await isFeatureEnabled(FeatureFlag.SEGMENTS, null) + const segmentsEnabled = false + const info = workflowInfo() + + if (segmentsEnabled) { + // we should spawn refreshDashboardCache for each tenant-segment couples + + const SEGMENT_PAGE_SIZE = 250 + let offset = 0 + let segments: ISegment[] + + const segmentLeafIdMap = new Map() + + for (const tenant of tenants) { + // get all segments in tenant + + do { + segments = await activity.getAllSegments(tenant.tenantId, SEGMENT_PAGE_SIZE, offset) + + // find each segment's associated leaf segment + for (const segment of segments) { + if (segment.slug && segment.parentSlug && segment.grandparentSlug) { + // it's already a leaf segment, add itself to leaf segment id map + segmentLeafIdMap.set(segment.segmentId, [segment.segmentId]) + } else if (segment.slug && segment.parentSlug && !segment.grandparentSlug) { + // it's a parent segment, find its leafs + const leafs = await activity.getProjectLeafSegments(segment.slug, tenant.tenantId) + segmentLeafIdMap.set( + segment.segmentId, + leafs.map((l) => l.segmentId), + ) + } else if (segment.slug && !segment.parentSlug && !segment.grandparentSlug) { + // it's a grandparent segment, find its leafs + const leafs = await activity.getProjectGroupLeafSegments(segment.slug, tenant.tenantId) + segmentLeafIdMap.set( + segment.segmentId, + leafs.map((l) => l.segmentId), + ) + } + } + + offset += SEGMENT_PAGE_SIZE + } while (segments.length > 0) + + // create a workflow for each tenantId-segmentId couple + await Promise.all( + Array.from(segmentLeafIdMap).map(([segmentId, leafSegmentIds]) => { + return startChild(refreshDashboardCache, { + workflowId: `${info.workflowId}/${tenant.tenantId}/${segmentId}`, + cancellationType: ChildWorkflowCancellationType.ABANDON, + parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON, + retry: { + backoffCoefficient: 2, + initialInterval: 2 * 1000, + maximumInterval: 30 * 1000, + }, + args: [ + { + tenantId: tenant.tenantId, + segmentId, + leafSegmentIds, + }, + ], + searchAttributes: { + TenantId: [tenant.tenantId], + }, + }) + }), + ) + } + } else { + await Promise.all( + tenants.map((tenant) => { + return startChild(refreshDashboardCache, { + workflowId: `${info.workflowId}/${tenant.tenantId}`, + cancellationType: ChildWorkflowCancellationType.ABANDON, + parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON, + retry: { + backoffCoefficient: 2, + initialInterval: 2 * 1000, + maximumInterval: 30 * 1000, + }, + args: [ + { + tenantId: tenant.tenantId, + }, + ], + searchAttributes: { + TenantId: [tenant.tenantId], + }, + }) + }), + ) + } +} diff --git a/services/archetypes/worker/src/index.ts b/services/archetypes/worker/src/index.ts index 1febedad7e..f7186174e5 100644 --- a/services/archetypes/worker/src/index.ts +++ b/services/archetypes/worker/src/index.ts @@ -38,6 +38,7 @@ Options is used to configure the worker service. */ export interface Options { maxTaskQueueActivitiesPerSecond?: number + maxConcurrentActivityTaskExecutions?: number postgres: { enabled: boolean } @@ -210,6 +211,7 @@ export class ServiceWorker extends Service { activities: require(path.resolve('./src/activities')), dataConverter: await getDataConverter(), maxTaskQueueActivitiesPerSecond: this.options.maxTaskQueueActivitiesPerSecond, + maxConcurrentActivityTaskExecutions: this.options.maxConcurrentActivityTaskExecutions, }) } catch (err) { throw new Error(err) diff --git a/services/libs/cubejs/src/enums.ts b/services/libs/cubejs/src/enums.ts index 47e7890868..9a174c1358 100644 --- a/services/libs/cubejs/src/enums.ts +++ b/services/libs/cubejs/src/enums.ts @@ -1,15 +1,18 @@ export enum CubeDimension { MEMBER_JOINED_AT = 'Members.joinedAt', + MEMBER_JOINED_AT_DAY = 'Members.joinedAt.day', IS_TEAM_MEMBER = 'Members.isTeamMember', IS_BOT = 'Members.isBot', IS_ORGANIZATION = 'Members.isOrganization', ACTIVITY_DATE = 'Activities.date', + ACTIVITY_DATE_DAY = 'Activities.date.day', ACTIVITY_PLATFORM = 'Activities.platform', ACTIVITY_TYPE = 'Activities.type', ACTIVITY_SENTIMENT_MOOD = 'Activities.sentimentMood', CONVERSATION_CREATED_AT = 'Conversations.createdat', CONVERSATION_FIRST_ACTIVITY_TIME = 'Conversations.firstActivityTime', ORGANIZATIONS_JOINED_AT = 'Organizations.joinedAt', + ORGANIZATIONS_JOINED_AT_DAY = 'Organizations.joinedAt.day', SEGMENTS_ID = 'Segments.id', } diff --git a/services/libs/cubejs/src/metrics/activeMembers.ts b/services/libs/cubejs/src/metrics/activeMembers.ts index 89e7ac6775..7409a66dcf 100644 --- a/services/libs/cubejs/src/metrics/activeMembers.ts +++ b/services/libs/cubejs/src/metrics/activeMembers.ts @@ -17,7 +17,7 @@ export default async ( cjs: CubeJsService, startDate: moment.Moment, endDate: moment.Moment, - granularity: CubeGranularity = null, + granularity: CubeGranularity | string = null, filter: IDashboardFilter = {}, rawResult = false, ) => { @@ -47,11 +47,11 @@ export default async ( }) } - if (filter.segment) { + if (filter.segments) { filters.push({ member: CubeDimension.SEGMENTS_ID, operator: 'equals', - values: [filter.segment], + values: filter.segments, }) } diff --git a/services/libs/cubejs/src/metrics/activeOrganizations.ts b/services/libs/cubejs/src/metrics/activeOrganizations.ts index 5f3601991a..1bc4163d34 100644 --- a/services/libs/cubejs/src/metrics/activeOrganizations.ts +++ b/services/libs/cubejs/src/metrics/activeOrganizations.ts @@ -16,7 +16,7 @@ export default async ( cjs: CubeJsService, startDate: moment.Moment, endDate: moment.Moment, - granularity: CubeGranularity = undefined, + granularity: CubeGranularity | string = undefined, filter: IDashboardFilter = {}, rawResult = false, ) => { @@ -41,11 +41,11 @@ export default async ( }) } - if (filter.segment) { + if (filter.segments) { filters.push({ member: CubeDimension.SEGMENTS_ID, operator: 'equals', - values: [filter.segment], + values: filter.segments, }) } diff --git a/services/libs/cubejs/src/metrics/index.ts b/services/libs/cubejs/src/metrics/index.ts index 47b7630371..f05376e28a 100644 --- a/services/libs/cubejs/src/metrics/index.ts +++ b/services/libs/cubejs/src/metrics/index.ts @@ -1,5 +1,4 @@ export { default as newActivities } from './newActivities' -export { default as newActivitiesTimeseries } from './newActivitiesTimeseries' export { default as newConversations } from './newConversations' export { default as activeMembers } from './activeMembers' export { default as newMembers } from './newMembers' diff --git a/services/libs/cubejs/src/metrics/newActivities.ts b/services/libs/cubejs/src/metrics/newActivities.ts index e577652747..666029dc80 100644 --- a/services/libs/cubejs/src/metrics/newActivities.ts +++ b/services/libs/cubejs/src/metrics/newActivities.ts @@ -16,8 +16,8 @@ export default async ( cjs: CubeJsService, startDate: moment.Moment, endDate: moment.Moment, - granularity: CubeGranularity = null, - dimensions: CubeDimension[] = [], + granularity: CubeGranularity | string = null, + dimensions: CubeDimension[] | string[] = [], filter: IDashboardFilter = {}, order: ICubeOrder = { [CubeDimension.ACTIVITY_DATE]: CubeOrderDirection.ASC }, rawResult = false, @@ -43,11 +43,11 @@ export default async ( }) } - if (filter.segment) { + if (filter.segments) { filters.push({ member: CubeDimension.SEGMENTS_ID, operator: 'equals', - values: [filter.segment], + values: filter.segments, }) } diff --git a/services/libs/cubejs/src/metrics/newActivitiesTimeseries.ts b/services/libs/cubejs/src/metrics/newActivitiesTimeseries.ts deleted file mode 100644 index 2ccca1feb4..0000000000 --- a/services/libs/cubejs/src/metrics/newActivitiesTimeseries.ts +++ /dev/null @@ -1,69 +0,0 @@ -import moment from 'moment' - -import { CubeJsService } from '../service' -import { ICubeFilter } from '../types' -import { CubeGranularity, CubeDimension, CubeMeasure } from '../enums' - -/** - * Gets `new activities` timeseries data for a given date range in given granularity. - * Activities are new when activity.timestamp is in between given date range. - * @param cjs cubejs service instance - * @param startDate - * @param endDate - * @returns - */ -export default async ( - cjs: CubeJsService, - startDate: moment.Moment, - endDate: moment.Moment, - granularity: CubeGranularity = CubeGranularity.DAY, - platform: string = undefined, - segment: string = undefined, -) => { - const filters: ICubeFilter[] = [ - { - member: CubeDimension.IS_TEAM_MEMBER, - operator: 'equals', - values: ['false'], - }, - { - member: CubeDimension.IS_BOT, - operator: 'equals', - values: ['false'], - }, - ] - - if (platform) { - filters.push({ - member: CubeDimension.ACTIVITY_PLATFORM, - operator: 'equals', - values: [platform], - }) - } - - if (segment) { - filters.push({ - member: CubeDimension.SEGMENTS_ID, - operator: 'equals', - values: [segment], - }) - } - - const query = { - measures: [CubeMeasure.ACTIVITY_COUNT], - timeDimensions: [ - { - dimension: CubeDimension.ACTIVITY_DATE, - dateRange: [startDate.format('YYYY-MM-DD'), endDate.format('YYYY-MM-DD')], - granularity, - }, - ], - filters, - } - - cjs.log.info(query) - - const newActivitiesTimeseries = await cjs.load(query) - - return newActivitiesTimeseries || [] -} diff --git a/services/libs/cubejs/src/metrics/newMembers.ts b/services/libs/cubejs/src/metrics/newMembers.ts index 69ae85722f..b75cdf91b0 100644 --- a/services/libs/cubejs/src/metrics/newMembers.ts +++ b/services/libs/cubejs/src/metrics/newMembers.ts @@ -3,6 +3,9 @@ import moment from 'moment' import { CubeJsService } from '../service' import { CubeGranularity, CubeDimension, CubeMeasure } from '../enums' import { ICubeFilter, IDashboardFilter } from '../types' +import { getServiceLogger } from '@crowd/logging' + +const log = getServiceLogger() /** * Gets `new members` count or timeseries data for a given date range and granularity. @@ -16,7 +19,7 @@ export default async ( cjs: CubeJsService, startDate: moment.Moment, endDate: moment.Moment, - granularity: CubeGranularity = null, + granularity: CubeGranularity | string = null, filter: IDashboardFilter = {}, rawResult = false, ) => { @@ -46,15 +49,15 @@ export default async ( }) } - if (filter.segment) { + if (filter.segments) { filters.push({ member: CubeDimension.SEGMENTS_ID, operator: 'equals', - values: [filter.segment], + values: filter.segments, }) } - const newMembers = await cjs.load({ + const query = { measures: [CubeMeasure.MEMBER_COUNT], timeDimensions: [ { @@ -65,7 +68,9 @@ export default async ( ], order: { [CubeDimension.MEMBER_JOINED_AT]: 'asc' }, filters, - }) + } + + const newMembers = await cjs.load(query) if (rawResult || granularity) { return newMembers diff --git a/services/libs/cubejs/src/metrics/newOrganizations.ts b/services/libs/cubejs/src/metrics/newOrganizations.ts index 40ab091f6a..a250146bc2 100644 --- a/services/libs/cubejs/src/metrics/newOrganizations.ts +++ b/services/libs/cubejs/src/metrics/newOrganizations.ts @@ -16,7 +16,7 @@ export default async ( cjs: CubeJsService, startDate: moment.Moment, endDate: moment.Moment, - granularity: CubeGranularity = null, + granularity: CubeGranularity | string = null, filter: IDashboardFilter = {}, rawResult = false, ) => { @@ -41,11 +41,11 @@ export default async ( }) } - if (filter.segment) { + if (filter.segments) { filters.push({ member: CubeDimension.SEGMENTS_ID, operator: 'equals', - values: [filter.segment], + values: filter.segments, }) } diff --git a/services/libs/cubejs/src/repository.ts b/services/libs/cubejs/src/repository.ts index 0bbc1af01e..7d68cdf739 100644 --- a/services/libs/cubejs/src/repository.ts +++ b/services/libs/cubejs/src/repository.ts @@ -5,8 +5,6 @@ export class CubeJsRepository { static getNewActivities = metrics.newActivities - static getNewActivitiesTimeseries = metrics.newActivitiesTimeseries - static getNewConversations = metrics.newConversations static getNewMembers = metrics.newMembers diff --git a/services/libs/cubejs/src/types.ts b/services/libs/cubejs/src/types.ts index 8e8d84bbd3..bc3d54dc72 100644 --- a/services/libs/cubejs/src/types.ts +++ b/services/libs/cubejs/src/types.ts @@ -8,7 +8,7 @@ export interface ICubeFilter { export interface IDashboardFilter { platform?: string - segment?: string + segments?: string[] } type CubeOrderKey = CubeDimension | CubeMeasure diff --git a/services/libs/types/src/temporal/cache.ts b/services/libs/types/src/temporal/cache.ts new file mode 100644 index 0000000000..8e5ed585ec --- /dev/null +++ b/services/libs/types/src/temporal/cache.ts @@ -0,0 +1,5 @@ +export interface IProcessRefreshDashboardCacheArgs { + tenantId: string + segmentId?: string + leafSegmentIds?: string[] +} diff --git a/services/libs/types/src/temporal/index.ts b/services/libs/types/src/temporal/index.ts index 26e659f79a..41ca6de385 100644 --- a/services/libs/types/src/temporal/index.ts +++ b/services/libs/types/src/temporal/index.ts @@ -1 +1,2 @@ export * from './automations' +export * from './cache'