Skip to content

Commit

Permalink
Fix redis connection (twentyhq#7956)
Browse files Browse the repository at this point in the history
## Context
bull-mq connection was not working as intended, the connection parameter
was ignored and was falling back to localhost.
This PR should fix the issue by instantiating a IORedis client following
bullmq documentation https://docs.bullmq.io/guide/connections
I also changed cache-storage module to use IORedis client as well to be
more consistent even though it was not necessary there. We could move
that instantiation to a factory class in the future.

## Test
start server + worker with correct port and wrong port with
cache-storage-type memory/redis and queue-type sync/bull-mq
  • Loading branch information
Weiko authored Oct 22, 2024
1 parent 18cfe79 commit 02c34d5
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import { redisStore } from 'cache-manager-redis-yet';

import { CacheStorageType } from 'src/engine/core-modules/cache-storage/types/cache-storage-type.enum';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';

export const cacheStorageModuleFactory = (
environmentService: EnvironmentService,
redisClientService: RedisClientService,
): CacheModuleOptions => {
const cacheStorageType = environmentService.get('CACHE_STORAGE_TYPE');
const cacheStorageTtl = environmentService.get('CACHE_STORAGE_TTL');
Expand All @@ -20,18 +22,10 @@ export const cacheStorageModuleFactory = (
return cacheModuleOptions;
}
case CacheStorageType.Redis: {
const connectionString = environmentService.get('REDIS_URL');

if (!connectionString) {
throw new Error(
`${cacheStorageType} cache storage requires REDIS_URL to be defined, check your .env file`,
);
}

return {
...cacheModuleOptions,
store: redisStore,
url: connectionString,
client: redisClientService.getClient(),
};
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { FlushCacheCommand } from 'src/engine/core-modules/cache-storage/command
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';

@Global()
@Module({
Expand All @@ -15,7 +16,7 @@ import { EnvironmentService } from 'src/engine/core-modules/environment/environm
isGlobal: true,
imports: [ConfigModule],
useFactory: cacheStorageModuleFactory,
inject: [EnvironmentService],
inject: [EnvironmentService, RedisClientService],
}),
],
providers: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ import { messageQueueModuleFactory } from 'src/engine/core-modules/message-queue
import { TimelineMessagingModule } from 'src/engine/core-modules/messaging/timeline-messaging.module';
import { OpenApiModule } from 'src/engine/core-modules/open-api/open-api.module';
import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-credentials/postgres-credentials.module';
import { RedisClientModule } from 'src/engine/core-modules/redis-client/redis-client.module';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
import { serverlessModuleFactory } from 'src/engine/core-modules/serverless/serverless-module.factory';
import { ServerlessModule } from 'src/engine/core-modules/serverless/serverless.module';
import { WorkspaceSSOModule } from 'src/engine/core-modules/sso/sso.module';
import { TelemetryModule } from 'src/engine/core-modules/telemetry/telemetry.module';
import { UserModule } from 'src/engine/core-modules/user/user.module';
import { WorkflowTriggerApiModule } from 'src/engine/core-modules/workflow/workflow-trigger-api.module';
import { WorkspaceInvitationModule } from 'src/engine/core-modules/workspace-invitation/workspace-invitation.module';
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module';
import { WorkspaceSSOModule } from 'src/engine/core-modules/sso/sso.module';

import { AnalyticsModule } from './analytics/analytics.module';
import { ClientConfigModule } from './client-config/client-config.module';
Expand Down Expand Up @@ -69,6 +71,7 @@ import { FileModule } from './file/file.module';
ActorModule,
TelemetryModule,
EnvironmentModule.forRoot({}),
RedisClientModule,
FileStorageModule.forRootAsync({
useFactory: fileStorageModuleFactory,
inject: [EnvironmentService],
Expand All @@ -79,7 +82,7 @@ import { FileModule } from './file/file.module';
}),
MessageQueueModule.registerAsync({
useFactory: messageQueueModuleFactory,
inject: [EnvironmentService],
inject: [EnvironmentService, RedisClientService],
}),
ExceptionHandlerModule.forRootAsync({
useFactory: exceptionHandlerModuleFactory,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { ConnectionOptions } from 'tls';

import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import {
BullMQDriverFactoryOptions,
Expand All @@ -8,6 +6,7 @@ import {
PgBossDriverFactoryOptions,
SyncDriverFactoryOptions,
} from 'src/engine/core-modules/message-queue/interfaces';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';

/**
* MessageQueue Module factory
Expand All @@ -16,6 +15,7 @@ import {
*/
export const messageQueueModuleFactory = async (
environmentService: EnvironmentService,
redisClientService: RedisClientService,
): Promise<MessageQueueModuleOptions> => {
const driverType = environmentService.get('MESSAGE_QUEUE_TYPE');

Expand All @@ -37,18 +37,10 @@ export const messageQueueModuleFactory = async (
} satisfies PgBossDriverFactoryOptions;
}
case MessageQueueDriverType.BullMQ: {
const connectionString = environmentService.get('REDIS_URL');

if (!connectionString) {
throw new Error(
`${MessageQueueDriverType.BullMQ} message queue requires REDIS_URL to be defined, check your .env file`,
);
}

return {
type: MessageQueueDriverType.BullMQ,
options: {
connection: connectionString as ConnectionOptions,
connection: redisClientService.getClient(),
},
} satisfies BullMQDriverFactoryOptions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Global, Module } from '@nestjs/common';

import { EnvironmentModule } from 'src/engine/core-modules/environment/environment.module';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';

@Global()
@Module({
imports: [EnvironmentModule],
providers: [RedisClientService],
exports: [RedisClientService],
})
export class RedisClientModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';

import IORedis from 'ioredis';

import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';

@Injectable()
export class RedisClientService implements OnModuleDestroy {
private redisClient: IORedis | null = null;

constructor(private readonly environmentService: EnvironmentService) {}

getClient() {
if (!this.redisClient) {
const redisUrl = this.environmentService.get('REDIS_URL');

if (!redisUrl) {
throw new Error('REDIS_URL must be defined');
}

this.redisClient = new IORedis(redisUrl);
}

return this.redisClient;
}

async onModuleDestroy() {
if (this.redisClient) {
await this.redisClient.quit();
this.redisClient = null;
}
}
}

0 comments on commit 02c34d5

Please sign in to comment.